linux/net/ceph/osd_client.c
<<
>>
Prefs
   1
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/module.h>
   5#include <linux/err.h>
   6#include <linux/highmem.h>
   7#include <linux/mm.h>
   8#include <linux/pagemap.h>
   9#include <linux/slab.h>
  10#include <linux/uaccess.h>
  11#ifdef CONFIG_BLOCK
  12#include <linux/bio.h>
  13#endif
  14
  15#include <linux/ceph/libceph.h>
  16#include <linux/ceph/osd_client.h>
  17#include <linux/ceph/messenger.h>
  18#include <linux/ceph/decode.h>
  19#include <linux/ceph/auth.h>
  20#include <linux/ceph/pagelist.h>
  21
  22#define OSD_OP_FRONT_LEN        4096
  23#define OSD_OPREPLY_FRONT_LEN   512
  24
  25static struct kmem_cache        *ceph_osd_request_cache;
  26
  27static const struct ceph_connection_operations osd_con_ops;
  28
  29static void __send_queued(struct ceph_osd_client *osdc);
  30static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  31static void __register_request(struct ceph_osd_client *osdc,
  32                               struct ceph_osd_request *req);
  33static void __unregister_request(struct ceph_osd_client *osdc,
  34                                 struct ceph_osd_request *req);
  35static void __unregister_linger_request(struct ceph_osd_client *osdc,
  36                                        struct ceph_osd_request *req);
  37static void __enqueue_request(struct ceph_osd_request *req);
  38static void __send_request(struct ceph_osd_client *osdc,
  39                           struct ceph_osd_request *req);
  40
  41/*
  42 * Implement client access to distributed object storage cluster.
  43 *
  44 * All data objects are stored within a cluster/cloud of OSDs, or
  45 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  46 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  47 * remote daemons serving up and coordinating consistent and safe
  48 * access to storage.
  49 *
  50 * Cluster membership and the mapping of data objects onto storage devices
  51 * are described by the osd map.
  52 *
  53 * We keep track of pending OSD requests (read, write), resubmit
  54 * requests to different OSDs when the cluster topology/data layout
  55 * change, or retry the affected requests when the communications
  56 * channel with an OSD is reset.
  57 */
  58
  59/*
  60 * calculate the mapping of a file extent onto an object, and fill out the
  61 * request accordingly.  shorten extent as necessary if it crosses an
  62 * object boundary.
  63 *
  64 * fill osd op in request message.
  65 */
  66static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
  67                        u64 *objnum, u64 *objoff, u64 *objlen)
  68{
  69        u64 orig_len = *plen;
  70        int r;
  71
  72        /* object extent? */
  73        r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
  74                                          objoff, objlen);
  75        if (r < 0)
  76                return r;
  77        if (*objlen < orig_len) {
  78                *plen = *objlen;
  79                dout(" skipping last %llu, final file extent %llu~%llu\n",
  80                     orig_len - *plen, off, *plen);
  81        }
  82
  83        dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
  84
  85        return 0;
  86}
  87
  88static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
  89{
  90        memset(osd_data, 0, sizeof (*osd_data));
  91        osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
  92}
  93
  94static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
  95                        struct page **pages, u64 length, u32 alignment,
  96                        bool pages_from_pool, bool own_pages)
  97{
  98        osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
  99        osd_data->pages = pages;
 100        osd_data->length = length;
 101        osd_data->alignment = alignment;
 102        osd_data->pages_from_pool = pages_from_pool;
 103        osd_data->own_pages = own_pages;
 104}
 105
 106static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 107                        struct ceph_pagelist *pagelist)
 108{
 109        osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
 110        osd_data->pagelist = pagelist;
 111}
 112
 113#ifdef CONFIG_BLOCK
 114static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 115                        struct bio *bio, size_t bio_length)
 116{
 117        osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
 118        osd_data->bio = bio;
 119        osd_data->bio_length = bio_length;
 120}
 121#endif /* CONFIG_BLOCK */
 122
 123#define osd_req_op_data(oreq, whch, typ, fld)   \
 124        ({                                              \
 125                BUG_ON(whch >= (oreq)->r_num_ops);      \
 126                &(oreq)->r_ops[whch].typ.fld;           \
 127        })
 128
 129static struct ceph_osd_data *
 130osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
 131{
 132        BUG_ON(which >= osd_req->r_num_ops);
 133
 134        return &osd_req->r_ops[which].raw_data_in;
 135}
 136
 137struct ceph_osd_data *
 138osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
 139                        unsigned int which)
 140{
 141        return osd_req_op_data(osd_req, which, extent, osd_data);
 142}
 143EXPORT_SYMBOL(osd_req_op_extent_osd_data);
 144
 145struct ceph_osd_data *
 146osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
 147                        unsigned int which)
 148{
 149        return osd_req_op_data(osd_req, which, cls, response_data);
 150}
 151EXPORT_SYMBOL(osd_req_op_cls_response_data);    /* ??? */
 152
 153void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
 154                        unsigned int which, struct page **pages,
 155                        u64 length, u32 alignment,
 156                        bool pages_from_pool, bool own_pages)
 157{
 158        struct ceph_osd_data *osd_data;
 159
 160        osd_data = osd_req_op_raw_data_in(osd_req, which);
 161        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 162                                pages_from_pool, own_pages);
 163}
 164EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
 165
 166void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
 167                        unsigned int which, struct page **pages,
 168                        u64 length, u32 alignment,
 169                        bool pages_from_pool, bool own_pages)
 170{
 171        struct ceph_osd_data *osd_data;
 172
 173        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 174        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 175                                pages_from_pool, own_pages);
 176}
 177EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
 178
 179void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
 180                        unsigned int which, struct ceph_pagelist *pagelist)
 181{
 182        struct ceph_osd_data *osd_data;
 183
 184        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 185        ceph_osd_data_pagelist_init(osd_data, pagelist);
 186}
 187EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 188
 189#ifdef CONFIG_BLOCK
 190void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 191                        unsigned int which, struct bio *bio, size_t bio_length)
 192{
 193        struct ceph_osd_data *osd_data;
 194
 195        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 196        ceph_osd_data_bio_init(osd_data, bio, bio_length);
 197}
 198EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 199#endif /* CONFIG_BLOCK */
 200
 201static void osd_req_op_cls_request_info_pagelist(
 202                        struct ceph_osd_request *osd_req,
 203                        unsigned int which, struct ceph_pagelist *pagelist)
 204{
 205        struct ceph_osd_data *osd_data;
 206
 207        osd_data = osd_req_op_data(osd_req, which, cls, request_info);
 208        ceph_osd_data_pagelist_init(osd_data, pagelist);
 209}
 210
 211void osd_req_op_cls_request_data_pagelist(
 212                        struct ceph_osd_request *osd_req,
 213                        unsigned int which, struct ceph_pagelist *pagelist)
 214{
 215        struct ceph_osd_data *osd_data;
 216
 217        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 218        ceph_osd_data_pagelist_init(osd_data, pagelist);
 219}
 220EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
 221
 222void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
 223                        unsigned int which, struct page **pages, u64 length,
 224                        u32 alignment, bool pages_from_pool, bool own_pages)
 225{
 226        struct ceph_osd_data *osd_data;
 227
 228        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 229        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 230                                pages_from_pool, own_pages);
 231}
 232EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 233
 234void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 235                        unsigned int which, struct page **pages, u64 length,
 236                        u32 alignment, bool pages_from_pool, bool own_pages)
 237{
 238        struct ceph_osd_data *osd_data;
 239
 240        osd_data = osd_req_op_data(osd_req, which, cls, response_data);
 241        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
 242                                pages_from_pool, own_pages);
 243}
 244EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 245
 246static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 247{
 248        switch (osd_data->type) {
 249        case CEPH_OSD_DATA_TYPE_NONE:
 250                return 0;
 251        case CEPH_OSD_DATA_TYPE_PAGES:
 252                return osd_data->length;
 253        case CEPH_OSD_DATA_TYPE_PAGELIST:
 254                return (u64)osd_data->pagelist->length;
 255#ifdef CONFIG_BLOCK
 256        case CEPH_OSD_DATA_TYPE_BIO:
 257                return (u64)osd_data->bio_length;
 258#endif /* CONFIG_BLOCK */
 259        default:
 260                WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
 261                return 0;
 262        }
 263}
 264
 265static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
 266{
 267        if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
 268                int num_pages;
 269
 270                num_pages = calc_pages_for((u64)osd_data->alignment,
 271                                                (u64)osd_data->length);
 272                ceph_release_page_vector(osd_data->pages, num_pages);
 273        }
 274        ceph_osd_data_init(osd_data);
 275}
 276
 277static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 278                        unsigned int which)
 279{
 280        struct ceph_osd_req_op *op;
 281
 282        BUG_ON(which >= osd_req->r_num_ops);
 283        op = &osd_req->r_ops[which];
 284
 285        switch (op->op) {
 286        case CEPH_OSD_OP_READ:
 287        case CEPH_OSD_OP_WRITE:
 288        case CEPH_OSD_OP_WRITEFULL:
 289                ceph_osd_data_release(&op->extent.osd_data);
 290                break;
 291        case CEPH_OSD_OP_CALL:
 292                ceph_osd_data_release(&op->cls.request_info);
 293                ceph_osd_data_release(&op->cls.request_data);
 294                ceph_osd_data_release(&op->cls.response_data);
 295                break;
 296        case CEPH_OSD_OP_SETXATTR:
 297        case CEPH_OSD_OP_CMPXATTR:
 298                ceph_osd_data_release(&op->xattr.osd_data);
 299                break;
 300        case CEPH_OSD_OP_STAT:
 301                ceph_osd_data_release(&op->raw_data_in);
 302                break;
 303        default:
 304                break;
 305        }
 306}
 307
 308/*
 309 * requests
 310 */
 311static void ceph_osdc_release_request(struct kref *kref)
 312{
 313        struct ceph_osd_request *req = container_of(kref,
 314                                            struct ceph_osd_request, r_kref);
 315        unsigned int which;
 316
 317        dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
 318             req->r_request, req->r_reply);
 319        WARN_ON(!RB_EMPTY_NODE(&req->r_node));
 320        WARN_ON(!list_empty(&req->r_req_lru_item));
 321        WARN_ON(!list_empty(&req->r_osd_item));
 322        WARN_ON(!list_empty(&req->r_linger_item));
 323        WARN_ON(!list_empty(&req->r_linger_osd_item));
 324        WARN_ON(req->r_osd);
 325
 326        if (req->r_request)
 327                ceph_msg_put(req->r_request);
 328        if (req->r_reply) {
 329                ceph_msg_revoke_incoming(req->r_reply);
 330                ceph_msg_put(req->r_reply);
 331        }
 332
 333        for (which = 0; which < req->r_num_ops; which++)
 334                osd_req_op_data_release(req, which);
 335
 336        ceph_put_snap_context(req->r_snapc);
 337        if (req->r_mempool)
 338                mempool_free(req, req->r_osdc->req_mempool);
 339        else
 340                kmem_cache_free(ceph_osd_request_cache, req);
 341
 342}
 343
 344void ceph_osdc_get_request(struct ceph_osd_request *req)
 345{
 346        dout("%s %p (was %d)\n", __func__, req,
 347             atomic_read(&req->r_kref.refcount));
 348        kref_get(&req->r_kref);
 349}
 350EXPORT_SYMBOL(ceph_osdc_get_request);
 351
 352void ceph_osdc_put_request(struct ceph_osd_request *req)
 353{
 354        dout("%s %p (was %d)\n", __func__, req,
 355             atomic_read(&req->r_kref.refcount));
 356        kref_put(&req->r_kref, ceph_osdc_release_request);
 357}
 358EXPORT_SYMBOL(ceph_osdc_put_request);
 359
 360struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 361                                               struct ceph_snap_context *snapc,
 362                                               unsigned int num_ops,
 363                                               bool use_mempool,
 364                                               gfp_t gfp_flags)
 365{
 366        struct ceph_osd_request *req;
 367        struct ceph_msg *msg;
 368        size_t msg_size;
 369
 370        BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
 371        BUG_ON(num_ops > CEPH_OSD_MAX_OP);
 372
 373        msg_size = 4 + 4 + 8 + 8 + 4+8;
 374        msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
 375        msg_size += 1 + 8 + 4 + 4;     /* pg_t */
 376        msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
 377        msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
 378        msg_size += 8;  /* snapid */
 379        msg_size += 8;  /* snap_seq */
 380        msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
 381        msg_size += 4;
 382
 383        if (use_mempool) {
 384                req = mempool_alloc(osdc->req_mempool, gfp_flags);
 385                memset(req, 0, sizeof(*req));
 386        } else {
 387                req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
 388        }
 389        if (req == NULL)
 390                return NULL;
 391
 392        req->r_osdc = osdc;
 393        req->r_mempool = use_mempool;
 394        req->r_num_ops = num_ops;
 395
 396        kref_init(&req->r_kref);
 397        init_completion(&req->r_completion);
 398        init_completion(&req->r_safe_completion);
 399        RB_CLEAR_NODE(&req->r_node);
 400        INIT_LIST_HEAD(&req->r_unsafe_item);
 401        INIT_LIST_HEAD(&req->r_linger_item);
 402        INIT_LIST_HEAD(&req->r_linger_osd_item);
 403        INIT_LIST_HEAD(&req->r_req_lru_item);
 404        INIT_LIST_HEAD(&req->r_osd_item);
 405
 406        req->r_base_oloc.pool = -1;
 407        req->r_target_oloc.pool = -1;
 408
 409        /* create reply message */
 410        if (use_mempool)
 411                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 412        else
 413                msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 414                                   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
 415        if (!msg) {
 416                ceph_osdc_put_request(req);
 417                return NULL;
 418        }
 419        req->r_reply = msg;
 420
 421        /* create request message; allow space for oid */
 422        if (use_mempool)
 423                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 424        else
 425                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
 426        if (!msg) {
 427                ceph_osdc_put_request(req);
 428                return NULL;
 429        }
 430
 431        memset(msg->front.iov_base, 0, msg->front.iov_len);
 432
 433        req->r_request = msg;
 434
 435        return req;
 436}
 437EXPORT_SYMBOL(ceph_osdc_alloc_request);
 438
 439static bool osd_req_opcode_valid(u16 opcode)
 440{
 441        switch (opcode) {
 442#define GENERATE_CASE(op, opcode, str)  case CEPH_OSD_OP_##op: return true;
 443__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
 444#undef GENERATE_CASE
 445        default:
 446                return false;
 447        }
 448}
 449
 450/*
 451 * This is an osd op init function for opcodes that have no data or
 452 * other information associated with them.  It also serves as a
 453 * common init routine for all the other init functions, below.
 454 */
 455static struct ceph_osd_req_op *
 456_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
 457                 u16 opcode, u32 flags)
 458{
 459        struct ceph_osd_req_op *op;
 460
 461        BUG_ON(which >= osd_req->r_num_ops);
 462        BUG_ON(!osd_req_opcode_valid(opcode));
 463
 464        op = &osd_req->r_ops[which];
 465        memset(op, 0, sizeof (*op));
 466        op->op = opcode;
 467        op->flags = flags;
 468
 469        return op;
 470}
 471
 472void osd_req_op_init(struct ceph_osd_request *osd_req,
 473                     unsigned int which, u16 opcode, u32 flags)
 474{
 475        (void)_osd_req_op_init(osd_req, which, opcode, flags);
 476}
 477EXPORT_SYMBOL(osd_req_op_init);
 478
 479void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 480                                unsigned int which, u16 opcode,
 481                                u64 offset, u64 length,
 482                                u64 truncate_size, u32 truncate_seq)
 483{
 484        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 485                                                      opcode, 0);
 486        size_t payload_len = 0;
 487
 488        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 489               opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
 490               opcode != CEPH_OSD_OP_TRUNCATE);
 491
 492        op->extent.offset = offset;
 493        op->extent.length = length;
 494        op->extent.truncate_size = truncate_size;
 495        op->extent.truncate_seq = truncate_seq;
 496        if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
 497                payload_len += length;
 498
 499        op->payload_len = payload_len;
 500}
 501EXPORT_SYMBOL(osd_req_op_extent_init);
 502
 503void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 504                                unsigned int which, u64 length)
 505{
 506        struct ceph_osd_req_op *op;
 507        u64 previous;
 508
 509        BUG_ON(which >= osd_req->r_num_ops);
 510        op = &osd_req->r_ops[which];
 511        previous = op->extent.length;
 512
 513        if (length == previous)
 514                return;         /* Nothing to do */
 515        BUG_ON(length > previous);
 516
 517        op->extent.length = length;
 518        op->payload_len -= previous - length;
 519}
 520EXPORT_SYMBOL(osd_req_op_extent_update);
 521
 522void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 523                        u16 opcode, const char *class, const char *method)
 524{
 525        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 526                                                      opcode, 0);
 527        struct ceph_pagelist *pagelist;
 528        size_t payload_len = 0;
 529        size_t size;
 530
 531        BUG_ON(opcode != CEPH_OSD_OP_CALL);
 532
 533        pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
 534        BUG_ON(!pagelist);
 535        ceph_pagelist_init(pagelist);
 536
 537        op->cls.class_name = class;
 538        size = strlen(class);
 539        BUG_ON(size > (size_t) U8_MAX);
 540        op->cls.class_len = size;
 541        ceph_pagelist_append(pagelist, class, size);
 542        payload_len += size;
 543
 544        op->cls.method_name = method;
 545        size = strlen(method);
 546        BUG_ON(size > (size_t) U8_MAX);
 547        op->cls.method_len = size;
 548        ceph_pagelist_append(pagelist, method, size);
 549        payload_len += size;
 550
 551        osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
 552
 553        op->cls.argc = 0;       /* currently unused */
 554
 555        op->payload_len = payload_len;
 556}
 557EXPORT_SYMBOL(osd_req_op_cls_init);
 558
 559int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 560                          u16 opcode, const char *name, const void *value,
 561                          size_t size, u8 cmp_op, u8 cmp_mode)
 562{
 563        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 564                                                      opcode, 0);
 565        struct ceph_pagelist *pagelist;
 566        size_t payload_len;
 567
 568        BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 569
 570        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
 571        if (!pagelist)
 572                return -ENOMEM;
 573
 574        ceph_pagelist_init(pagelist);
 575
 576        payload_len = strlen(name);
 577        op->xattr.name_len = payload_len;
 578        ceph_pagelist_append(pagelist, name, payload_len);
 579
 580        op->xattr.value_len = size;
 581        ceph_pagelist_append(pagelist, value, size);
 582        payload_len += size;
 583
 584        op->xattr.cmp_op = cmp_op;
 585        op->xattr.cmp_mode = cmp_mode;
 586
 587        ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
 588        op->payload_len = payload_len;
 589        return 0;
 590}
 591EXPORT_SYMBOL(osd_req_op_xattr_init);
 592
 593void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 594                                unsigned int which, u16 opcode,
 595                                u64 cookie, u64 version, int flag)
 596{
 597        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 598                                                      opcode, 0);
 599
 600        BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 601
 602        op->watch.cookie = cookie;
 603        op->watch.ver = version;
 604        if (opcode == CEPH_OSD_OP_WATCH && flag)
 605                op->watch.flag = (u8)1;
 606}
 607EXPORT_SYMBOL(osd_req_op_watch_init);
 608
 609void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 610                                unsigned int which,
 611                                u64 expected_object_size,
 612                                u64 expected_write_size)
 613{
 614        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 615                                                      CEPH_OSD_OP_SETALLOCHINT,
 616                                                      0);
 617
 618        op->alloc_hint.expected_object_size = expected_object_size;
 619        op->alloc_hint.expected_write_size = expected_write_size;
 620
 621        /*
 622         * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
 623         * not worth a feature bit.  Set FAILOK per-op flag to make
 624         * sure older osds don't trip over an unsupported opcode.
 625         */
 626        op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
 627}
 628EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
 629
 630static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 631                                struct ceph_osd_data *osd_data)
 632{
 633        u64 length = ceph_osd_data_length(osd_data);
 634
 635        if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
 636                BUG_ON(length > (u64) SIZE_MAX);
 637                if (length)
 638                        ceph_msg_data_add_pages(msg, osd_data->pages,
 639                                        length, osd_data->alignment);
 640        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
 641                BUG_ON(!length);
 642                ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
 643#ifdef CONFIG_BLOCK
 644        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
 645                ceph_msg_data_add_bio(msg, osd_data->bio, length);
 646#endif
 647        } else {
 648                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 649        }
 650}
 651
 652static u64 osd_req_encode_op(struct ceph_osd_request *req,
 653                              struct ceph_osd_op *dst, unsigned int which)
 654{
 655        struct ceph_osd_req_op *src;
 656        struct ceph_osd_data *osd_data;
 657        u64 request_data_len = 0;
 658        u64 data_length;
 659
 660        BUG_ON(which >= req->r_num_ops);
 661        src = &req->r_ops[which];
 662        if (WARN_ON(!osd_req_opcode_valid(src->op))) {
 663                pr_err("unrecognized osd opcode %d\n", src->op);
 664
 665                return 0;
 666        }
 667
 668        switch (src->op) {
 669        case CEPH_OSD_OP_STAT:
 670                osd_data = &src->raw_data_in;
 671                ceph_osdc_msg_data_add(req->r_reply, osd_data);
 672                break;
 673        case CEPH_OSD_OP_READ:
 674        case CEPH_OSD_OP_WRITE:
 675        case CEPH_OSD_OP_WRITEFULL:
 676        case CEPH_OSD_OP_ZERO:
 677        case CEPH_OSD_OP_TRUNCATE:
 678                if (src->op == CEPH_OSD_OP_WRITE ||
 679                    src->op == CEPH_OSD_OP_WRITEFULL)
 680                        request_data_len = src->extent.length;
 681                dst->extent.offset = cpu_to_le64(src->extent.offset);
 682                dst->extent.length = cpu_to_le64(src->extent.length);
 683                dst->extent.truncate_size =
 684                        cpu_to_le64(src->extent.truncate_size);
 685                dst->extent.truncate_seq =
 686                        cpu_to_le32(src->extent.truncate_seq);
 687                osd_data = &src->extent.osd_data;
 688                if (src->op == CEPH_OSD_OP_WRITE ||
 689                    src->op == CEPH_OSD_OP_WRITEFULL)
 690                        ceph_osdc_msg_data_add(req->r_request, osd_data);
 691                else
 692                        ceph_osdc_msg_data_add(req->r_reply, osd_data);
 693                break;
 694        case CEPH_OSD_OP_CALL:
 695                dst->cls.class_len = src->cls.class_len;
 696                dst->cls.method_len = src->cls.method_len;
 697                osd_data = &src->cls.request_info;
 698                ceph_osdc_msg_data_add(req->r_request, osd_data);
 699                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
 700                request_data_len = osd_data->pagelist->length;
 701
 702                osd_data = &src->cls.request_data;
 703                data_length = ceph_osd_data_length(osd_data);
 704                if (data_length) {
 705                        BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
 706                        dst->cls.indata_len = cpu_to_le32(data_length);
 707                        ceph_osdc_msg_data_add(req->r_request, osd_data);
 708                        src->payload_len += data_length;
 709                        request_data_len += data_length;
 710                }
 711                osd_data = &src->cls.response_data;
 712                ceph_osdc_msg_data_add(req->r_reply, osd_data);
 713                break;
 714        case CEPH_OSD_OP_STARTSYNC:
 715                break;
 716        case CEPH_OSD_OP_NOTIFY_ACK:
 717        case CEPH_OSD_OP_WATCH:
 718                dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 719                dst->watch.ver = cpu_to_le64(src->watch.ver);
 720                dst->watch.flag = src->watch.flag;
 721                break;
 722        case CEPH_OSD_OP_SETALLOCHINT:
 723                dst->alloc_hint.expected_object_size =
 724                    cpu_to_le64(src->alloc_hint.expected_object_size);
 725                dst->alloc_hint.expected_write_size =
 726                    cpu_to_le64(src->alloc_hint.expected_write_size);
 727                break;
 728        case CEPH_OSD_OP_SETXATTR:
 729        case CEPH_OSD_OP_CMPXATTR:
 730                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 731                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 732                dst->xattr.cmp_op = src->xattr.cmp_op;
 733                dst->xattr.cmp_mode = src->xattr.cmp_mode;
 734                osd_data = &src->xattr.osd_data;
 735                ceph_osdc_msg_data_add(req->r_request, osd_data);
 736                request_data_len = osd_data->pagelist->length;
 737                break;
 738        case CEPH_OSD_OP_CREATE:
 739        case CEPH_OSD_OP_DELETE:
 740                break;
 741        default:
 742                pr_err("unsupported osd opcode %s\n",
 743                        ceph_osd_op_name(src->op));
 744                WARN_ON(1);
 745
 746                return 0;
 747        }
 748
 749        dst->op = cpu_to_le16(src->op);
 750        dst->flags = cpu_to_le32(src->flags);
 751        dst->payload_len = cpu_to_le32(src->payload_len);
 752
 753        return request_data_len;
 754}
 755
 756/*
 757 * build new request AND message, calculate layout, and adjust file
 758 * extent as needed.
 759 *
 760 * if the file was recently truncated, we include information about its
 761 * old and new size so that the object can be updated appropriately.  (we
 762 * avoid synchronously deleting truncated objects because it's slow.)
 763 *
 764 * if @do_sync, include a 'startsync' command so that the osd will flush
 765 * data quickly.
 766 */
 767struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 768                                               struct ceph_file_layout *layout,
 769                                               struct ceph_vino vino,
 770                                               u64 off, u64 *plen,
 771                                               unsigned int which, int num_ops,
 772                                               int opcode, int flags,
 773                                               struct ceph_snap_context *snapc,
 774                                               u32 truncate_seq,
 775                                               u64 truncate_size,
 776                                               bool use_mempool)
 777{
 778        struct ceph_osd_request *req;
 779        u64 objnum = 0;
 780        u64 objoff = 0;
 781        u64 objlen = 0;
 782        int r;
 783
 784        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
 785               opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
 786               opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
 787
 788        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 789                                        GFP_NOFS);
 790        if (!req)
 791                return ERR_PTR(-ENOMEM);
 792
 793        req->r_flags = flags;
 794
 795        /* calculate max write size */
 796        r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
 797        if (r < 0) {
 798                ceph_osdc_put_request(req);
 799                return ERR_PTR(r);
 800        }
 801
 802        if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
 803                osd_req_op_init(req, which, opcode, 0);
 804        } else {
 805                u32 object_size = le32_to_cpu(layout->fl_object_size);
 806                u32 object_base = off - objoff;
 807                if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
 808                        if (truncate_size <= object_base) {
 809                                truncate_size = 0;
 810                        } else {
 811                                truncate_size -= object_base;
 812                                if (truncate_size > object_size)
 813                                        truncate_size = object_size;
 814                        }
 815                }
 816                osd_req_op_extent_init(req, which, opcode, objoff, objlen,
 817                                       truncate_size, truncate_seq);
 818        }
 819
 820        req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
 821
 822        snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
 823                 "%llx.%08llx", vino.ino, objnum);
 824        req->r_base_oid.name_len = strlen(req->r_base_oid.name);
 825
 826        return req;
 827}
 828EXPORT_SYMBOL(ceph_osdc_new_request);
 829
 830/*
 831 * We keep osd requests in an rbtree, sorted by ->r_tid.
 832 */
 833static void __insert_request(struct ceph_osd_client *osdc,
 834                             struct ceph_osd_request *new)
 835{
 836        struct rb_node **p = &osdc->requests.rb_node;
 837        struct rb_node *parent = NULL;
 838        struct ceph_osd_request *req = NULL;
 839
 840        while (*p) {
 841                parent = *p;
 842                req = rb_entry(parent, struct ceph_osd_request, r_node);
 843                if (new->r_tid < req->r_tid)
 844                        p = &(*p)->rb_left;
 845                else if (new->r_tid > req->r_tid)
 846                        p = &(*p)->rb_right;
 847                else
 848                        BUG();
 849        }
 850
 851        rb_link_node(&new->r_node, parent, p);
 852        rb_insert_color(&new->r_node, &osdc->requests);
 853}
 854
 855static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 856                                                 u64 tid)
 857{
 858        struct ceph_osd_request *req;
 859        struct rb_node *n = osdc->requests.rb_node;
 860
 861        while (n) {
 862                req = rb_entry(n, struct ceph_osd_request, r_node);
 863                if (tid < req->r_tid)
 864                        n = n->rb_left;
 865                else if (tid > req->r_tid)
 866                        n = n->rb_right;
 867                else
 868                        return req;
 869        }
 870        return NULL;
 871}
 872
 873static struct ceph_osd_request *
 874__lookup_request_ge(struct ceph_osd_client *osdc,
 875                    u64 tid)
 876{
 877        struct ceph_osd_request *req;
 878        struct rb_node *n = osdc->requests.rb_node;
 879
 880        while (n) {
 881                req = rb_entry(n, struct ceph_osd_request, r_node);
 882                if (tid < req->r_tid) {
 883                        if (!n->rb_left)
 884                                return req;
 885                        n = n->rb_left;
 886                } else if (tid > req->r_tid) {
 887                        n = n->rb_right;
 888                } else {
 889                        return req;
 890                }
 891        }
 892        return NULL;
 893}
 894
 895static void __kick_linger_request(struct ceph_osd_request *req)
 896{
 897        struct ceph_osd_client *osdc = req->r_osdc;
 898        struct ceph_osd *osd = req->r_osd;
 899
 900        /*
 901         * Linger requests need to be resent with a new tid to avoid
 902         * the dup op detection logic on the OSDs.  Achieve this with
 903         * a re-register dance instead of open-coding.
 904         */
 905        ceph_osdc_get_request(req);
 906        if (!list_empty(&req->r_linger_item))
 907                __unregister_linger_request(osdc, req);
 908        else
 909                __unregister_request(osdc, req);
 910        __register_request(osdc, req);
 911        ceph_osdc_put_request(req);
 912
 913        /*
 914         * Unless request has been registered as both normal and
 915         * lingering, __unregister{,_linger}_request clears r_osd.
 916         * However, here we need to preserve r_osd to make sure we
 917         * requeue on the same OSD.
 918         */
 919        WARN_ON(req->r_osd || !osd);
 920        req->r_osd = osd;
 921
 922        dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
 923        __enqueue_request(req);
 924}
 925
 926/*
 927 * Resubmit requests pending on the given osd.
 928 */
 929static void __kick_osd_requests(struct ceph_osd_client *osdc,
 930                                struct ceph_osd *osd)
 931{
 932        struct ceph_osd_request *req, *nreq;
 933        LIST_HEAD(resend);
 934        LIST_HEAD(resend_linger);
 935        int err;
 936
 937        dout("%s osd%d\n", __func__, osd->o_osd);
 938        err = __reset_osd(osdc, osd);
 939        if (err)
 940                return;
 941
 942        /*
 943         * Build up a list of requests to resend by traversing the
 944         * osd's list of requests.  Requests for a given object are
 945         * sent in tid order, and that is also the order they're
 946         * kept on this list.  Therefore all requests that are in
 947         * flight will be found first, followed by all requests that
 948         * have not yet been sent.  And to resend requests while
 949         * preserving this order we will want to put any sent
 950         * requests back on the front of the osd client's unsent
 951         * list.
 952         *
 953         * So we build a separate ordered list of already-sent
 954         * requests for the affected osd and splice it onto the
 955         * front of the osd client's unsent list.  Once we've seen a
 956         * request that has not yet been sent we're done.  Those
 957         * requests are already sitting right where they belong.
 958         */
 959        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 960                if (!req->r_sent)
 961                        break;
 962
 963                if (!req->r_linger) {
 964                        dout("%s requeueing %p tid %llu\n", __func__, req,
 965                             req->r_tid);
 966                        list_move_tail(&req->r_req_lru_item, &resend);
 967                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
 968                } else {
 969                        list_move_tail(&req->r_req_lru_item, &resend_linger);
 970                }
 971        }
 972        list_splice(&resend, &osdc->req_unsent);
 973
 974        /*
 975         * Both registered and not yet registered linger requests are
 976         * enqueued with a new tid on the same OSD.  We add/move them
 977         * to req_unsent/o_requests at the end to keep things in tid
 978         * order.
 979         */
 980        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 981                                 r_linger_osd_item) {
 982                WARN_ON(!list_empty(&req->r_req_lru_item));
 983                __kick_linger_request(req);
 984        }
 985
 986        list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
 987                __kick_linger_request(req);
 988}
 989
 990/*
 991 * If the osd connection drops, we need to resubmit all requests.
 992 */
 993static void osd_reset(struct ceph_connection *con)
 994{
 995        struct ceph_osd *osd = con->private;
 996        struct ceph_osd_client *osdc;
 997
 998        if (!osd)
 999                return;
1000        dout("osd_reset osd%d\n", osd->o_osd);
1001        osdc = osd->o_osdc;
1002        down_read(&osdc->map_sem);
1003        mutex_lock(&osdc->request_mutex);
1004        __kick_osd_requests(osdc, osd);
1005        __send_queued(osdc);
1006        mutex_unlock(&osdc->request_mutex);
1007        up_read(&osdc->map_sem);
1008}
1009
1010/*
1011 * Track open sessions with osds.
1012 */
1013static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1014{
1015        struct ceph_osd *osd;
1016
1017        osd = kzalloc(sizeof(*osd), GFP_NOFS);
1018        if (!osd)
1019                return NULL;
1020
1021        atomic_set(&osd->o_ref, 1);
1022        osd->o_osdc = osdc;
1023        osd->o_osd = onum;
1024        RB_CLEAR_NODE(&osd->o_node);
1025        INIT_LIST_HEAD(&osd->o_requests);
1026        INIT_LIST_HEAD(&osd->o_linger_requests);
1027        INIT_LIST_HEAD(&osd->o_osd_lru);
1028        osd->o_incarnation = 1;
1029
1030        ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1031
1032        INIT_LIST_HEAD(&osd->o_keepalive_item);
1033        return osd;
1034}
1035
1036static struct ceph_osd *get_osd(struct ceph_osd *osd)
1037{
1038        if (atomic_inc_not_zero(&osd->o_ref)) {
1039                dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
1040                     atomic_read(&osd->o_ref));
1041                return osd;
1042        } else {
1043                dout("get_osd %p FAIL\n", osd);
1044                return NULL;
1045        }
1046}
1047
1048static void put_osd(struct ceph_osd *osd)
1049{
1050        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
1051             atomic_read(&osd->o_ref) - 1);
1052        if (atomic_dec_and_test(&osd->o_ref)) {
1053                struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
1054
1055                if (osd->o_auth.authorizer)
1056                        ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
1057                kfree(osd);
1058        }
1059}
1060
1061/*
1062 * remove an osd from our map
1063 */
1064static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1065{
1066        dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1067        WARN_ON(!list_empty(&osd->o_requests));
1068        WARN_ON(!list_empty(&osd->o_linger_requests));
1069
1070        list_del_init(&osd->o_osd_lru);
1071        rb_erase(&osd->o_node, &osdc->osds);
1072        RB_CLEAR_NODE(&osd->o_node);
1073}
1074
1075static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1076{
1077        dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1078
1079        if (!RB_EMPTY_NODE(&osd->o_node)) {
1080                ceph_con_close(&osd->o_con);
1081                __remove_osd(osdc, osd);
1082                put_osd(osd);
1083        }
1084}
1085
1086static void remove_all_osds(struct ceph_osd_client *osdc)
1087{
1088        dout("%s %p\n", __func__, osdc);
1089        mutex_lock(&osdc->request_mutex);
1090        while (!RB_EMPTY_ROOT(&osdc->osds)) {
1091                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
1092                                                struct ceph_osd, o_node);
1093                remove_osd(osdc, osd);
1094        }
1095        mutex_unlock(&osdc->request_mutex);
1096}
1097
1098static void __move_osd_to_lru(struct ceph_osd_client *osdc,
1099                              struct ceph_osd *osd)
1100{
1101        dout("%s %p\n", __func__, osd);
1102        BUG_ON(!list_empty(&osd->o_osd_lru));
1103
1104        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1105        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1106}
1107
1108static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
1109                                  struct ceph_osd *osd)
1110{
1111        dout("%s %p\n", __func__, osd);
1112
1113        if (list_empty(&osd->o_requests) &&
1114            list_empty(&osd->o_linger_requests))
1115                __move_osd_to_lru(osdc, osd);
1116}
1117
1118static void __remove_osd_from_lru(struct ceph_osd *osd)
1119{
1120        dout("__remove_osd_from_lru %p\n", osd);
1121        if (!list_empty(&osd->o_osd_lru))
1122                list_del_init(&osd->o_osd_lru);
1123}
1124
1125static void remove_old_osds(struct ceph_osd_client *osdc)
1126{
1127        struct ceph_osd *osd, *nosd;
1128
1129        dout("__remove_old_osds %p\n", osdc);
1130        mutex_lock(&osdc->request_mutex);
1131        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
1132                if (time_before(jiffies, osd->lru_ttl))
1133                        break;
1134                remove_osd(osdc, osd);
1135        }
1136        mutex_unlock(&osdc->request_mutex);
1137}
1138
1139/*
1140 * reset osd connect
1141 */
1142static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1143{
1144        struct ceph_entity_addr *peer_addr;
1145
1146        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
1147        if (list_empty(&osd->o_requests) &&
1148            list_empty(&osd->o_linger_requests)) {
1149                remove_osd(osdc, osd);
1150                return -ENODEV;
1151        }
1152
1153        peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
1154        if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1155                        !ceph_con_opened(&osd->o_con)) {
1156                struct ceph_osd_request *req;
1157
1158                dout("osd addr hasn't changed and connection never opened, "
1159                     "letting msgr retry\n");
1160                /* touch each r_stamp for handle_timeout()'s benfit */
1161                list_for_each_entry(req, &osd->o_requests, r_osd_item)
1162                        req->r_stamp = jiffies;
1163
1164                return -EAGAIN;
1165        }
1166
1167        ceph_con_close(&osd->o_con);
1168        ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1169        osd->o_incarnation++;
1170
1171        return 0;
1172}
1173
1174static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
1175{
1176        struct rb_node **p = &osdc->osds.rb_node;
1177        struct rb_node *parent = NULL;
1178        struct ceph_osd *osd = NULL;
1179
1180        dout("__insert_osd %p osd%d\n", new, new->o_osd);
1181        while (*p) {
1182                parent = *p;
1183                osd = rb_entry(parent, struct ceph_osd, o_node);
1184                if (new->o_osd < osd->o_osd)
1185                        p = &(*p)->rb_left;
1186                else if (new->o_osd > osd->o_osd)
1187                        p = &(*p)->rb_right;
1188                else
1189                        BUG();
1190        }
1191
1192        rb_link_node(&new->o_node, parent, p);
1193        rb_insert_color(&new->o_node, &osdc->osds);
1194}
1195
1196static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
1197{
1198        struct ceph_osd *osd;
1199        struct rb_node *n = osdc->osds.rb_node;
1200
1201        while (n) {
1202                osd = rb_entry(n, struct ceph_osd, o_node);
1203                if (o < osd->o_osd)
1204                        n = n->rb_left;
1205                else if (o > osd->o_osd)
1206                        n = n->rb_right;
1207                else
1208                        return osd;
1209        }
1210        return NULL;
1211}
1212
1213static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
1214{
1215        schedule_delayed_work(&osdc->timeout_work,
1216                              osdc->client->options->osd_keepalive_timeout);
1217}
1218
1219static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
1220{
1221        cancel_delayed_work(&osdc->timeout_work);
1222}
1223
1224/*
1225 * Register request, assign tid.  If this is the first request, set up
1226 * the timeout event.
1227 */
1228static void __register_request(struct ceph_osd_client *osdc,
1229                               struct ceph_osd_request *req)
1230{
1231        req->r_tid = ++osdc->last_tid;
1232        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1233        dout("__register_request %p tid %lld\n", req, req->r_tid);
1234        __insert_request(osdc, req);
1235        ceph_osdc_get_request(req);
1236        osdc->num_requests++;
1237        if (osdc->num_requests == 1) {
1238                dout(" first request, scheduling timeout\n");
1239                __schedule_osd_timeout(osdc);
1240        }
1241}
1242
1243/*
1244 * called under osdc->request_mutex
1245 */
1246static void __unregister_request(struct ceph_osd_client *osdc,
1247                                 struct ceph_osd_request *req)
1248{
1249        if (RB_EMPTY_NODE(&req->r_node)) {
1250                dout("__unregister_request %p tid %lld not registered\n",
1251                        req, req->r_tid);
1252                return;
1253        }
1254
1255        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1256        rb_erase(&req->r_node, &osdc->requests);
1257        RB_CLEAR_NODE(&req->r_node);
1258        osdc->num_requests--;
1259
1260        if (req->r_osd) {
1261                /* make sure the original request isn't in flight. */
1262                ceph_msg_revoke(req->r_request);
1263
1264                list_del_init(&req->r_osd_item);
1265                maybe_move_osd_to_lru(osdc, req->r_osd);
1266                if (list_empty(&req->r_linger_osd_item))
1267                        req->r_osd = NULL;
1268        }
1269
1270        list_del_init(&req->r_req_lru_item);
1271        ceph_osdc_put_request(req);
1272
1273        if (osdc->num_requests == 0) {
1274                dout(" no requests, canceling timeout\n");
1275                __cancel_osd_timeout(osdc);
1276        }
1277}
1278
1279/*
1280 * Cancel a previously queued request message
1281 */
1282static void __cancel_request(struct ceph_osd_request *req)
1283{
1284        if (req->r_sent && req->r_osd) {
1285                ceph_msg_revoke(req->r_request);
1286                req->r_sent = 0;
1287        }
1288}
1289
1290static void __register_linger_request(struct ceph_osd_client *osdc,
1291                                    struct ceph_osd_request *req)
1292{
1293        dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1294        WARN_ON(!req->r_linger);
1295
1296        ceph_osdc_get_request(req);
1297        list_add_tail(&req->r_linger_item, &osdc->req_linger);
1298        if (req->r_osd)
1299                list_add_tail(&req->r_linger_osd_item,
1300                              &req->r_osd->o_linger_requests);
1301}
1302
1303static void __unregister_linger_request(struct ceph_osd_client *osdc,
1304                                        struct ceph_osd_request *req)
1305{
1306        WARN_ON(!req->r_linger);
1307
1308        if (list_empty(&req->r_linger_item)) {
1309                dout("%s %p tid %llu not registered\n", __func__, req,
1310                     req->r_tid);
1311                return;
1312        }
1313
1314        dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1315        list_del_init(&req->r_linger_item);
1316
1317        if (req->r_osd) {
1318                list_del_init(&req->r_linger_osd_item);
1319                maybe_move_osd_to_lru(osdc, req->r_osd);
1320                if (list_empty(&req->r_osd_item))
1321                        req->r_osd = NULL;
1322        }
1323        ceph_osdc_put_request(req);
1324}
1325
1326void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1327                                  struct ceph_osd_request *req)
1328{
1329        if (!req->r_linger) {
1330                dout("set_request_linger %p\n", req);
1331                req->r_linger = 1;
1332        }
1333}
1334EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1335
1336/*
1337 * Returns whether a request should be blocked from being sent
1338 * based on the current osdmap and osd_client settings.
1339 *
1340 * Caller should hold map_sem for read.
1341 */
1342static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1343                                   struct ceph_osd_request *req)
1344{
1345        bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1346        bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1347                ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1348        return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1349                (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1350}
1351
1352/*
1353 * Calculate mapping of a request to a PG.  Takes tiering into account.
1354 */
1355static int __calc_request_pg(struct ceph_osdmap *osdmap,
1356                             struct ceph_osd_request *req,
1357                             struct ceph_pg *pg_out)
1358{
1359        bool need_check_tiering;
1360
1361        need_check_tiering = false;
1362        if (req->r_target_oloc.pool == -1) {
1363                req->r_target_oloc = req->r_base_oloc; /* struct */
1364                need_check_tiering = true;
1365        }
1366        if (req->r_target_oid.name_len == 0) {
1367                ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1368                need_check_tiering = true;
1369        }
1370
1371        if (need_check_tiering &&
1372            (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1373                struct ceph_pg_pool_info *pi;
1374
1375                pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1376                if (pi) {
1377                        if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1378                            pi->read_tier >= 0)
1379                                req->r_target_oloc.pool = pi->read_tier;
1380                        if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1381                            pi->write_tier >= 0)
1382                                req->r_target_oloc.pool = pi->write_tier;
1383                }
1384                /* !pi is caught in ceph_oloc_oid_to_pg() */
1385        }
1386
1387        return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1388                                   &req->r_target_oid, pg_out);
1389}
1390
1391static void __enqueue_request(struct ceph_osd_request *req)
1392{
1393        struct ceph_osd_client *osdc = req->r_osdc;
1394
1395        dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
1396             req->r_osd ? req->r_osd->o_osd : -1);
1397
1398        if (req->r_osd) {
1399                __remove_osd_from_lru(req->r_osd);
1400                list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1401                list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1402        } else {
1403                list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1404        }
1405}
1406
1407/*
1408 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1409 * (as needed), and set the request r_osd appropriately.  If there is
1410 * no up osd, set r_osd to NULL.  Move the request to the appropriate list
1411 * (unsent, homeless) or leave on in-flight lru.
1412 *
1413 * Return 0 if unchanged, 1 if changed, or negative on error.
1414 *
1415 * Caller should hold map_sem for read and request_mutex.
1416 */
1417static int __map_request(struct ceph_osd_client *osdc,
1418                         struct ceph_osd_request *req, int force_resend)
1419{
1420        struct ceph_pg pgid;
1421        int acting[CEPH_PG_MAX_SIZE];
1422        int num, o;
1423        int err;
1424        bool was_paused;
1425
1426        dout("map_request %p tid %lld\n", req, req->r_tid);
1427
1428        err = __calc_request_pg(osdc->osdmap, req, &pgid);
1429        if (err) {
1430                list_move(&req->r_req_lru_item, &osdc->req_notarget);
1431                return err;
1432        }
1433        req->r_pgid = pgid;
1434
1435        num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
1436        if (num < 0)
1437                num = 0;
1438
1439        was_paused = req->r_paused;
1440        req->r_paused = __req_should_be_paused(osdc, req);
1441        if (was_paused && !req->r_paused)
1442                force_resend = 1;
1443
1444        if ((!force_resend &&
1445             req->r_osd && req->r_osd->o_osd == o &&
1446             req->r_sent >= req->r_osd->o_incarnation &&
1447             req->r_num_pg_osds == num &&
1448             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1449            (req->r_osd == NULL && o == -1) ||
1450            req->r_paused)
1451                return 0;  /* no change */
1452
1453        dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
1454             req->r_tid, pgid.pool, pgid.seed, o,
1455             req->r_osd ? req->r_osd->o_osd : -1);
1456
1457        /* record full pg acting set */
1458        memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1459        req->r_num_pg_osds = num;
1460
1461        if (req->r_osd) {
1462                __cancel_request(req);
1463                list_del_init(&req->r_osd_item);
1464                list_del_init(&req->r_linger_osd_item);
1465                req->r_osd = NULL;
1466        }
1467
1468        req->r_osd = __lookup_osd(osdc, o);
1469        if (!req->r_osd && o >= 0) {
1470                err = -ENOMEM;
1471                req->r_osd = create_osd(osdc, o);
1472                if (!req->r_osd) {
1473                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
1474                        goto out;
1475                }
1476
1477                dout("map_request osd %p is osd%d\n", req->r_osd, o);
1478                __insert_osd(osdc, req->r_osd);
1479
1480                ceph_con_open(&req->r_osd->o_con,
1481                              CEPH_ENTITY_TYPE_OSD, o,
1482                              &osdc->osdmap->osd_addr[o]);
1483        }
1484
1485        __enqueue_request(req);
1486        err = 1;   /* osd or pg changed */
1487
1488out:
1489        return err;
1490}
1491
1492/*
1493 * caller should hold map_sem (for read) and request_mutex
1494 */
1495static void __send_request(struct ceph_osd_client *osdc,
1496                           struct ceph_osd_request *req)
1497{
1498        void *p;
1499
1500        dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1501             req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1502             (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1503
1504        /* fill in message content that changes each time we send it */
1505        put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1506        put_unaligned_le32(req->r_flags, req->r_request_flags);
1507        put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1508        p = req->r_request_pgid;
1509        ceph_encode_64(&p, req->r_pgid.pool);
1510        ceph_encode_32(&p, req->r_pgid.seed);
1511        put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1512        memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1513               sizeof(req->r_reassert_version));
1514
1515        req->r_stamp = jiffies;
1516        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1517
1518        ceph_msg_get(req->r_request); /* send consumes a ref */
1519
1520        req->r_sent = req->r_osd->o_incarnation;
1521
1522        ceph_con_send(&req->r_osd->o_con, req->r_request);
1523}
1524
1525/*
1526 * Send any requests in the queue (req_unsent).
1527 */
1528static void __send_queued(struct ceph_osd_client *osdc)
1529{
1530        struct ceph_osd_request *req, *tmp;
1531
1532        dout("__send_queued\n");
1533        list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1534                __send_request(osdc, req);
1535}
1536
1537/*
1538 * Caller should hold map_sem for read and request_mutex.
1539 */
1540static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
1541                                     struct ceph_osd_request *req,
1542                                     bool nofail)
1543{
1544        int rc;
1545
1546        __register_request(osdc, req);
1547        req->r_sent = 0;
1548        req->r_got_reply = 0;
1549        rc = __map_request(osdc, req, 0);
1550        if (rc < 0) {
1551                if (nofail) {
1552                        dout("osdc_start_request failed map, "
1553                                " will retry %lld\n", req->r_tid);
1554                        rc = 0;
1555                } else {
1556                        __unregister_request(osdc, req);
1557                }
1558                return rc;
1559        }
1560
1561        if (req->r_osd == NULL) {
1562                dout("send_request %p no up osds in pg\n", req);
1563                ceph_monc_request_next_osdmap(&osdc->client->monc);
1564        } else {
1565                __send_queued(osdc);
1566        }
1567
1568        return 0;
1569}
1570
1571/*
1572 * Timeout callback, called every N seconds when 1 or more osd
1573 * requests has been active for more than N seconds.  When this
1574 * happens, we ping all OSDs with requests who have timed out to
1575 * ensure any communications channel reset is detected.  Reset the
1576 * request timeouts another N seconds in the future as we go.
1577 * Reschedule the timeout event another N seconds in future (unless
1578 * there are no open requests).
1579 */
1580static void handle_timeout(struct work_struct *work)
1581{
1582        struct ceph_osd_client *osdc =
1583                container_of(work, struct ceph_osd_client, timeout_work.work);
1584        struct ceph_options *opts = osdc->client->options;
1585        struct ceph_osd_request *req;
1586        struct ceph_osd *osd;
1587        struct list_head slow_osds;
1588        dout("timeout\n");
1589        down_read(&osdc->map_sem);
1590
1591        ceph_monc_request_next_osdmap(&osdc->client->monc);
1592
1593        mutex_lock(&osdc->request_mutex);
1594
1595        /*
1596         * ping osds that are a bit slow.  this ensures that if there
1597         * is a break in the TCP connection we will notice, and reopen
1598         * a connection with that osd (from the fault callback).
1599         */
1600        INIT_LIST_HEAD(&slow_osds);
1601        list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1602                if (time_before(jiffies,
1603                                req->r_stamp + opts->osd_keepalive_timeout))
1604                        break;
1605
1606                osd = req->r_osd;
1607                BUG_ON(!osd);
1608                dout(" tid %llu is slow, will send keepalive on osd%d\n",
1609                     req->r_tid, osd->o_osd);
1610                list_move_tail(&osd->o_keepalive_item, &slow_osds);
1611        }
1612        while (!list_empty(&slow_osds)) {
1613                osd = list_entry(slow_osds.next, struct ceph_osd,
1614                                 o_keepalive_item);
1615                list_del_init(&osd->o_keepalive_item);
1616                ceph_con_keepalive(&osd->o_con);
1617        }
1618
1619        __schedule_osd_timeout(osdc);
1620        __send_queued(osdc);
1621        mutex_unlock(&osdc->request_mutex);
1622        up_read(&osdc->map_sem);
1623}
1624
1625static void handle_osds_timeout(struct work_struct *work)
1626{
1627        struct ceph_osd_client *osdc =
1628                container_of(work, struct ceph_osd_client,
1629                             osds_timeout_work.work);
1630        unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
1631
1632        dout("osds timeout\n");
1633        down_read(&osdc->map_sem);
1634        remove_old_osds(osdc);
1635        up_read(&osdc->map_sem);
1636
1637        schedule_delayed_work(&osdc->osds_timeout_work,
1638                              round_jiffies_relative(delay));
1639}
1640
1641static int ceph_oloc_decode(void **p, void *end,
1642                            struct ceph_object_locator *oloc)
1643{
1644        u8 struct_v, struct_cv;
1645        u32 len;
1646        void *struct_end;
1647        int ret = 0;
1648
1649        ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1650        struct_v = ceph_decode_8(p);
1651        struct_cv = ceph_decode_8(p);
1652        if (struct_v < 3) {
1653                pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1654                        struct_v, struct_cv);
1655                goto e_inval;
1656        }
1657        if (struct_cv > 6) {
1658                pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1659                        struct_v, struct_cv);
1660                goto e_inval;
1661        }
1662        len = ceph_decode_32(p);
1663        ceph_decode_need(p, end, len, e_inval);
1664        struct_end = *p + len;
1665
1666        oloc->pool = ceph_decode_64(p);
1667        *p += 4; /* skip preferred */
1668
1669        len = ceph_decode_32(p);
1670        if (len > 0) {
1671                pr_warn("ceph_object_locator::key is set\n");
1672                goto e_inval;
1673        }
1674
1675        if (struct_v >= 5) {
1676                len = ceph_decode_32(p);
1677                if (len > 0) {
1678                        pr_warn("ceph_object_locator::nspace is set\n");
1679                        goto e_inval;
1680                }
1681        }
1682
1683        if (struct_v >= 6) {
1684                s64 hash = ceph_decode_64(p);
1685                if (hash != -1) {
1686                        pr_warn("ceph_object_locator::hash is set\n");
1687                        goto e_inval;
1688                }
1689        }
1690
1691        /* skip the rest */
1692        *p = struct_end;
1693out:
1694        return ret;
1695
1696e_inval:
1697        ret = -EINVAL;
1698        goto out;
1699}
1700
1701static int ceph_redirect_decode(void **p, void *end,
1702                                struct ceph_request_redirect *redir)
1703{
1704        u8 struct_v, struct_cv;
1705        u32 len;
1706        void *struct_end;
1707        int ret;
1708
1709        ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1710        struct_v = ceph_decode_8(p);
1711        struct_cv = ceph_decode_8(p);
1712        if (struct_cv > 1) {
1713                pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1714                        struct_v, struct_cv);
1715                goto e_inval;
1716        }
1717        len = ceph_decode_32(p);
1718        ceph_decode_need(p, end, len, e_inval);
1719        struct_end = *p + len;
1720
1721        ret = ceph_oloc_decode(p, end, &redir->oloc);
1722        if (ret)
1723                goto out;
1724
1725        len = ceph_decode_32(p);
1726        if (len > 0) {
1727                pr_warn("ceph_request_redirect::object_name is set\n");
1728                goto e_inval;
1729        }
1730
1731        len = ceph_decode_32(p);
1732        *p += len; /* skip osd_instructions */
1733
1734        /* skip the rest */
1735        *p = struct_end;
1736out:
1737        return ret;
1738
1739e_inval:
1740        ret = -EINVAL;
1741        goto out;
1742}
1743
1744static void complete_request(struct ceph_osd_request *req)
1745{
1746        complete_all(&req->r_safe_completion);  /* fsync waiter */
1747}
1748
1749/*
1750 * handle osd op reply.  either call the callback if it is specified,
1751 * or do the completion to wake up the waiting thread.
1752 */
1753static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1754                         struct ceph_connection *con)
1755{
1756        void *p, *end;
1757        struct ceph_osd_request *req;
1758        struct ceph_request_redirect redir;
1759        u64 tid;
1760        int object_len;
1761        unsigned int numops;
1762        int payload_len, flags;
1763        s32 result;
1764        s32 retry_attempt;
1765        struct ceph_pg pg;
1766        int err;
1767        u32 reassert_epoch;
1768        u64 reassert_version;
1769        u32 osdmap_epoch;
1770        int already_completed;
1771        u32 bytes;
1772        unsigned int i;
1773
1774        tid = le64_to_cpu(msg->hdr.tid);
1775        dout("handle_reply %p tid %llu\n", msg, tid);
1776
1777        p = msg->front.iov_base;
1778        end = p + msg->front.iov_len;
1779
1780        ceph_decode_need(&p, end, 4, bad);
1781        object_len = ceph_decode_32(&p);
1782        ceph_decode_need(&p, end, object_len, bad);
1783        p += object_len;
1784
1785        err = ceph_decode_pgid(&p, end, &pg);
1786        if (err)
1787                goto bad;
1788
1789        ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1790        flags = ceph_decode_64(&p);
1791        result = ceph_decode_32(&p);
1792        reassert_epoch = ceph_decode_32(&p);
1793        reassert_version = ceph_decode_64(&p);
1794        osdmap_epoch = ceph_decode_32(&p);
1795
1796        /* lookup */
1797        down_read(&osdc->map_sem);
1798        mutex_lock(&osdc->request_mutex);
1799        req = __lookup_request(osdc, tid);
1800        if (req == NULL) {
1801                dout("handle_reply tid %llu dne\n", tid);
1802                goto bad_mutex;
1803        }
1804        ceph_osdc_get_request(req);
1805
1806        dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1807             req, result);
1808
1809        ceph_decode_need(&p, end, 4, bad_put);
1810        numops = ceph_decode_32(&p);
1811        if (numops > CEPH_OSD_MAX_OP)
1812                goto bad_put;
1813        if (numops != req->r_num_ops)
1814                goto bad_put;
1815        payload_len = 0;
1816        ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
1817        for (i = 0; i < numops; i++) {
1818                struct ceph_osd_op *op = p;
1819                int len;
1820
1821                len = le32_to_cpu(op->payload_len);
1822                req->r_reply_op_len[i] = len;
1823                dout(" op %d has %d bytes\n", i, len);
1824                payload_len += len;
1825                p += sizeof(*op);
1826        }
1827        bytes = le32_to_cpu(msg->hdr.data_len);
1828        if (payload_len != bytes) {
1829                pr_warn("sum of op payload lens %d != data_len %d\n",
1830                        payload_len, bytes);
1831                goto bad_put;
1832        }
1833
1834        ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
1835        retry_attempt = ceph_decode_32(&p);
1836        for (i = 0; i < numops; i++)
1837                req->r_reply_op_result[i] = ceph_decode_32(&p);
1838
1839        if (le16_to_cpu(msg->hdr.version) >= 6) {
1840                p += 8 + 4; /* skip replay_version */
1841                p += 8; /* skip user_version */
1842
1843                err = ceph_redirect_decode(&p, end, &redir);
1844                if (err)
1845                        goto bad_put;
1846        } else {
1847                redir.oloc.pool = -1;
1848        }
1849
1850        if (redir.oloc.pool != -1) {
1851                dout("redirect pool %lld\n", redir.oloc.pool);
1852
1853                __unregister_request(osdc, req);
1854
1855                req->r_target_oloc = redir.oloc; /* struct */
1856
1857                /*
1858                 * Start redirect requests with nofail=true.  If
1859                 * mapping fails, request will end up on the notarget
1860                 * list, waiting for the new osdmap (which can take
1861                 * a while), even though the original request mapped
1862                 * successfully.  In the future we might want to follow
1863                 * original request's nofail setting here.
1864                 */
1865                err = __ceph_osdc_start_request(osdc, req, true);
1866                BUG_ON(err);
1867
1868                goto out_unlock;
1869        }
1870
1871        already_completed = req->r_got_reply;
1872        if (!req->r_got_reply) {
1873                req->r_result = result;
1874                dout("handle_reply result %d bytes %d\n", req->r_result,
1875                     bytes);
1876                if (req->r_result == 0)
1877                        req->r_result = bytes;
1878
1879                /* in case this is a write and we need to replay, */
1880                req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1881                req->r_reassert_version.version = cpu_to_le64(reassert_version);
1882
1883                req->r_got_reply = 1;
1884        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1885                dout("handle_reply tid %llu dup ack\n", tid);
1886                goto out_unlock;
1887        }
1888
1889        dout("handle_reply tid %llu flags %d\n", tid, flags);
1890
1891        if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1892                __register_linger_request(osdc, req);
1893
1894        /* either this is a read, or we got the safe response */
1895        if (result < 0 ||
1896            (flags & CEPH_OSD_FLAG_ONDISK) ||
1897            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1898                __unregister_request(osdc, req);
1899
1900        mutex_unlock(&osdc->request_mutex);
1901        up_read(&osdc->map_sem);
1902
1903        if (!already_completed) {
1904                if (req->r_unsafe_callback &&
1905                    result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
1906                        req->r_unsafe_callback(req, true);
1907                if (req->r_callback)
1908                        req->r_callback(req, msg);
1909                else
1910                        complete_all(&req->r_completion);
1911        }
1912
1913        if (flags & CEPH_OSD_FLAG_ONDISK) {
1914                if (req->r_unsafe_callback && already_completed)
1915                        req->r_unsafe_callback(req, false);
1916                complete_request(req);
1917        }
1918
1919out:
1920        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1921        ceph_osdc_put_request(req);
1922        return;
1923out_unlock:
1924        mutex_unlock(&osdc->request_mutex);
1925        up_read(&osdc->map_sem);
1926        goto out;
1927
1928bad_put:
1929        req->r_result = -EIO;
1930        __unregister_request(osdc, req);
1931        if (req->r_callback)
1932                req->r_callback(req, msg);
1933        else
1934                complete_all(&req->r_completion);
1935        complete_request(req);
1936        ceph_osdc_put_request(req);
1937bad_mutex:
1938        mutex_unlock(&osdc->request_mutex);
1939        up_read(&osdc->map_sem);
1940bad:
1941        pr_err("corrupt osd_op_reply got %d %d\n",
1942               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1943        ceph_msg_dump(msg);
1944}
1945
1946static void reset_changed_osds(struct ceph_osd_client *osdc)
1947{
1948        struct rb_node *p, *n;
1949
1950        dout("%s %p\n", __func__, osdc);
1951        for (p = rb_first(&osdc->osds); p; p = n) {
1952                struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1953
1954                n = rb_next(p);
1955                if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1956                    memcmp(&osd->o_con.peer_addr,
1957                           ceph_osd_addr(osdc->osdmap,
1958                                         osd->o_osd),
1959                           sizeof(struct ceph_entity_addr)) != 0)
1960                        __reset_osd(osdc, osd);
1961        }
1962}
1963
1964/*
1965 * Requeue requests whose mapping to an OSD has changed.  If requests map to
1966 * no osd, request a new map.
1967 *
1968 * Caller should hold map_sem for read.
1969 */
1970static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
1971                          bool force_resend_writes)
1972{
1973        struct ceph_osd_request *req, *nreq;
1974        struct rb_node *p;
1975        int needmap = 0;
1976        int err;
1977        bool force_resend_req;
1978
1979        dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
1980                force_resend_writes ? " (force resend writes)" : "");
1981        mutex_lock(&osdc->request_mutex);
1982        for (p = rb_first(&osdc->requests); p; ) {
1983                req = rb_entry(p, struct ceph_osd_request, r_node);
1984                p = rb_next(p);
1985
1986                /*
1987                 * For linger requests that have not yet been
1988                 * registered, move them to the linger list; they'll
1989                 * be sent to the osd in the loop below.  Unregister
1990                 * the request before re-registering it as a linger
1991                 * request to ensure the __map_request() below
1992                 * will decide it needs to be sent.
1993                 */
1994                if (req->r_linger && list_empty(&req->r_linger_item)) {
1995                        dout("%p tid %llu restart on osd%d\n",
1996                             req, req->r_tid,
1997                             req->r_osd ? req->r_osd->o_osd : -1);
1998                        ceph_osdc_get_request(req);
1999                        __unregister_request(osdc, req);
2000                        __register_linger_request(osdc, req);
2001                        ceph_osdc_put_request(req);
2002                        continue;
2003                }
2004
2005                force_resend_req = force_resend ||
2006                        (force_resend_writes &&
2007                                req->r_flags & CEPH_OSD_FLAG_WRITE);
2008                err = __map_request(osdc, req, force_resend_req);
2009                if (err < 0)
2010                        continue;  /* error */
2011                if (req->r_osd == NULL) {
2012                        dout("%p tid %llu maps to no osd\n", req, req->r_tid);
2013                        needmap++;  /* request a newer map */
2014                } else if (err > 0) {
2015                        if (!req->r_linger) {
2016                                dout("%p tid %llu requeued on osd%d\n", req,
2017                                     req->r_tid,
2018                                     req->r_osd ? req->r_osd->o_osd : -1);
2019                                req->r_flags |= CEPH_OSD_FLAG_RETRY;
2020                        }
2021                }
2022        }
2023
2024        list_for_each_entry_safe(req, nreq, &osdc->req_linger,
2025                                 r_linger_item) {
2026                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
2027
2028                err = __map_request(osdc, req,
2029                                    force_resend || force_resend_writes);
2030                dout("__map_request returned %d\n", err);
2031                if (err < 0)
2032                        continue;  /* hrm! */
2033                if (req->r_osd == NULL || err > 0) {
2034                        if (req->r_osd == NULL) {
2035                                dout("lingering %p tid %llu maps to no osd\n",
2036                                     req, req->r_tid);
2037                                /*
2038                                 * A homeless lingering request makes
2039                                 * no sense, as it's job is to keep
2040                                 * a particular OSD connection open.
2041                                 * Request a newer map and kick the
2042                                 * request, knowing that it won't be
2043                                 * resent until we actually get a map
2044                                 * that can tell us where to send it.
2045                                 */
2046                                needmap++;
2047                        }
2048
2049                        dout("kicking lingering %p tid %llu osd%d\n", req,
2050                             req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
2051                        __register_request(osdc, req);
2052                        __unregister_linger_request(osdc, req);
2053                }
2054        }
2055        reset_changed_osds(osdc);
2056        mutex_unlock(&osdc->request_mutex);
2057
2058        if (needmap) {
2059                dout("%d requests for down osds, need new map\n", needmap);
2060                ceph_monc_request_next_osdmap(&osdc->client->monc);
2061        }
2062}
2063
2064
2065/*
2066 * Process updated osd map.
2067 *
2068 * The message contains any number of incremental and full maps, normally
2069 * indicating some sort of topology change in the cluster.  Kick requests
2070 * off to different OSDs as needed.
2071 */
2072void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2073{
2074        void *p, *end, *next;
2075        u32 nr_maps, maplen;
2076        u32 epoch;
2077        struct ceph_osdmap *newmap = NULL, *oldmap;
2078        int err;
2079        struct ceph_fsid fsid;
2080        bool was_full;
2081
2082        dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
2083        p = msg->front.iov_base;
2084        end = p + msg->front.iov_len;
2085
2086        /* verify fsid */
2087        ceph_decode_need(&p, end, sizeof(fsid), bad);
2088        ceph_decode_copy(&p, &fsid, sizeof(fsid));
2089        if (ceph_check_fsid(osdc->client, &fsid) < 0)
2090                return;
2091
2092        down_write(&osdc->map_sem);
2093
2094        was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
2095
2096        /* incremental maps */
2097        ceph_decode_32_safe(&p, end, nr_maps, bad);
2098        dout(" %d inc maps\n", nr_maps);
2099        while (nr_maps > 0) {
2100                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2101                epoch = ceph_decode_32(&p);
2102                maplen = ceph_decode_32(&p);
2103                ceph_decode_need(&p, end, maplen, bad);
2104                next = p + maplen;
2105                if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
2106                        dout("applying incremental map %u len %d\n",
2107                             epoch, maplen);
2108                        newmap = osdmap_apply_incremental(&p, next,
2109                                                          osdc->osdmap,
2110                                                          &osdc->client->msgr);
2111                        if (IS_ERR(newmap)) {
2112                                err = PTR_ERR(newmap);
2113                                goto bad;
2114                        }
2115                        BUG_ON(!newmap);
2116                        if (newmap != osdc->osdmap) {
2117                                ceph_osdmap_destroy(osdc->osdmap);
2118                                osdc->osdmap = newmap;
2119                        }
2120                        was_full = was_full ||
2121                                ceph_osdmap_flag(osdc->osdmap,
2122                                                 CEPH_OSDMAP_FULL);
2123                        kick_requests(osdc, 0, was_full);
2124                } else {
2125                        dout("ignoring incremental map %u len %d\n",
2126                             epoch, maplen);
2127                }
2128                p = next;
2129                nr_maps--;
2130        }
2131        if (newmap)
2132                goto done;
2133
2134        /* full maps */
2135        ceph_decode_32_safe(&p, end, nr_maps, bad);
2136        dout(" %d full maps\n", nr_maps);
2137        while (nr_maps) {
2138                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2139                epoch = ceph_decode_32(&p);
2140                maplen = ceph_decode_32(&p);
2141                ceph_decode_need(&p, end, maplen, bad);
2142                if (nr_maps > 1) {
2143                        dout("skipping non-latest full map %u len %d\n",
2144                             epoch, maplen);
2145                } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
2146                        dout("skipping full map %u len %d, "
2147                             "older than our %u\n", epoch, maplen,
2148                             osdc->osdmap->epoch);
2149                } else {
2150                        int skipped_map = 0;
2151
2152                        dout("taking full map %u len %d\n", epoch, maplen);
2153                        newmap = ceph_osdmap_decode(&p, p+maplen);
2154                        if (IS_ERR(newmap)) {
2155                                err = PTR_ERR(newmap);
2156                                goto bad;
2157                        }
2158                        BUG_ON(!newmap);
2159                        oldmap = osdc->osdmap;
2160                        osdc->osdmap = newmap;
2161                        if (oldmap) {
2162                                if (oldmap->epoch + 1 < newmap->epoch)
2163                                        skipped_map = 1;
2164                                ceph_osdmap_destroy(oldmap);
2165                        }
2166                        was_full = was_full ||
2167                                ceph_osdmap_flag(osdc->osdmap,
2168                                                 CEPH_OSDMAP_FULL);
2169                        kick_requests(osdc, skipped_map, was_full);
2170                }
2171                p += maplen;
2172                nr_maps--;
2173        }
2174
2175        if (!osdc->osdmap)
2176                goto bad;
2177done:
2178        downgrade_write(&osdc->map_sem);
2179        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
2180
2181        /*
2182         * subscribe to subsequent osdmap updates if full to ensure
2183         * we find out when we are no longer full and stop returning
2184         * ENOSPC.
2185         */
2186        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2187                ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2188                ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
2189                ceph_monc_request_next_osdmap(&osdc->client->monc);
2190
2191        mutex_lock(&osdc->request_mutex);
2192        __send_queued(osdc);
2193        mutex_unlock(&osdc->request_mutex);
2194        up_read(&osdc->map_sem);
2195        wake_up_all(&osdc->client->auth_wq);
2196        return;
2197
2198bad:
2199        pr_err("osdc handle_map corrupt msg\n");
2200        ceph_msg_dump(msg);
2201        up_write(&osdc->map_sem);
2202}
2203
2204/*
2205 * watch/notify callback event infrastructure
2206 *
2207 * These callbacks are used both for watch and notify operations.
2208 */
2209static void __release_event(struct kref *kref)
2210{
2211        struct ceph_osd_event *event =
2212                container_of(kref, struct ceph_osd_event, kref);
2213
2214        dout("__release_event %p\n", event);
2215        kfree(event);
2216}
2217
2218static void get_event(struct ceph_osd_event *event)
2219{
2220        kref_get(&event->kref);
2221}
2222
2223void ceph_osdc_put_event(struct ceph_osd_event *event)
2224{
2225        kref_put(&event->kref, __release_event);
2226}
2227EXPORT_SYMBOL(ceph_osdc_put_event);
2228
2229static void __insert_event(struct ceph_osd_client *osdc,
2230                             struct ceph_osd_event *new)
2231{
2232        struct rb_node **p = &osdc->event_tree.rb_node;
2233        struct rb_node *parent = NULL;
2234        struct ceph_osd_event *event = NULL;
2235
2236        while (*p) {
2237                parent = *p;
2238                event = rb_entry(parent, struct ceph_osd_event, node);
2239                if (new->cookie < event->cookie)
2240                        p = &(*p)->rb_left;
2241                else if (new->cookie > event->cookie)
2242                        p = &(*p)->rb_right;
2243                else
2244                        BUG();
2245        }
2246
2247        rb_link_node(&new->node, parent, p);
2248        rb_insert_color(&new->node, &osdc->event_tree);
2249}
2250
2251static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
2252                                                u64 cookie)
2253{
2254        struct rb_node **p = &osdc->event_tree.rb_node;
2255        struct rb_node *parent = NULL;
2256        struct ceph_osd_event *event = NULL;
2257
2258        while (*p) {
2259                parent = *p;
2260                event = rb_entry(parent, struct ceph_osd_event, node);
2261                if (cookie < event->cookie)
2262                        p = &(*p)->rb_left;
2263                else if (cookie > event->cookie)
2264                        p = &(*p)->rb_right;
2265                else
2266                        return event;
2267        }
2268        return NULL;
2269}
2270
2271static void __remove_event(struct ceph_osd_event *event)
2272{
2273        struct ceph_osd_client *osdc = event->osdc;
2274
2275        if (!RB_EMPTY_NODE(&event->node)) {
2276                dout("__remove_event removed %p\n", event);
2277                rb_erase(&event->node, &osdc->event_tree);
2278                ceph_osdc_put_event(event);
2279        } else {
2280                dout("__remove_event didn't remove %p\n", event);
2281        }
2282}
2283
2284int ceph_osdc_create_event(struct ceph_osd_client *osdc,
2285                           void (*event_cb)(u64, u64, u8, void *),
2286                           void *data, struct ceph_osd_event **pevent)
2287{
2288        struct ceph_osd_event *event;
2289
2290        event = kmalloc(sizeof(*event), GFP_NOIO);
2291        if (!event)
2292                return -ENOMEM;
2293
2294        dout("create_event %p\n", event);
2295        event->cb = event_cb;
2296        event->one_shot = 0;
2297        event->data = data;
2298        event->osdc = osdc;
2299        INIT_LIST_HEAD(&event->osd_node);
2300        RB_CLEAR_NODE(&event->node);
2301        kref_init(&event->kref);   /* one ref for us */
2302        kref_get(&event->kref);    /* one ref for the caller */
2303
2304        spin_lock(&osdc->event_lock);
2305        event->cookie = ++osdc->event_count;
2306        __insert_event(osdc, event);
2307        spin_unlock(&osdc->event_lock);
2308
2309        *pevent = event;
2310        return 0;
2311}
2312EXPORT_SYMBOL(ceph_osdc_create_event);
2313
2314void ceph_osdc_cancel_event(struct ceph_osd_event *event)
2315{
2316        struct ceph_osd_client *osdc = event->osdc;
2317
2318        dout("cancel_event %p\n", event);
2319        spin_lock(&osdc->event_lock);
2320        __remove_event(event);
2321        spin_unlock(&osdc->event_lock);
2322        ceph_osdc_put_event(event); /* caller's */
2323}
2324EXPORT_SYMBOL(ceph_osdc_cancel_event);
2325
2326
2327static void do_event_work(struct work_struct *work)
2328{
2329        struct ceph_osd_event_work *event_work =
2330                container_of(work, struct ceph_osd_event_work, work);
2331        struct ceph_osd_event *event = event_work->event;
2332        u64 ver = event_work->ver;
2333        u64 notify_id = event_work->notify_id;
2334        u8 opcode = event_work->opcode;
2335
2336        dout("do_event_work completing %p\n", event);
2337        event->cb(ver, notify_id, opcode, event->data);
2338        dout("do_event_work completed %p\n", event);
2339        ceph_osdc_put_event(event);
2340        kfree(event_work);
2341}
2342
2343
2344/*
2345 * Process osd watch notifications
2346 */
2347static void handle_watch_notify(struct ceph_osd_client *osdc,
2348                                struct ceph_msg *msg)
2349{
2350        void *p, *end;
2351        u8 proto_ver;
2352        u64 cookie, ver, notify_id;
2353        u8 opcode;
2354        struct ceph_osd_event *event;
2355        struct ceph_osd_event_work *event_work;
2356
2357        p = msg->front.iov_base;
2358        end = p + msg->front.iov_len;
2359
2360        ceph_decode_8_safe(&p, end, proto_ver, bad);
2361        ceph_decode_8_safe(&p, end, opcode, bad);
2362        ceph_decode_64_safe(&p, end, cookie, bad);
2363        ceph_decode_64_safe(&p, end, ver, bad);
2364        ceph_decode_64_safe(&p, end, notify_id, bad);
2365
2366        spin_lock(&osdc->event_lock);
2367        event = __find_event(osdc, cookie);
2368        if (event) {
2369                BUG_ON(event->one_shot);
2370                get_event(event);
2371        }
2372        spin_unlock(&osdc->event_lock);
2373        dout("handle_watch_notify cookie %lld ver %lld event %p\n",
2374             cookie, ver, event);
2375        if (event) {
2376                event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
2377                if (!event_work) {
2378                        pr_err("couldn't allocate event_work\n");
2379                        ceph_osdc_put_event(event);
2380                        return;
2381                }
2382                INIT_WORK(&event_work->work, do_event_work);
2383                event_work->event = event;
2384                event_work->ver = ver;
2385                event_work->notify_id = notify_id;
2386                event_work->opcode = opcode;
2387
2388                queue_work(osdc->notify_wq, &event_work->work);
2389        }
2390
2391        return;
2392
2393bad:
2394        pr_err("osdc handle_watch_notify corrupt msg\n");
2395}
2396
2397/*
2398 * build new request AND message
2399 *
2400 */
2401void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2402                                struct ceph_snap_context *snapc, u64 snap_id,
2403                                struct timespec *mtime)
2404{
2405        struct ceph_msg *msg = req->r_request;
2406        void *p;
2407        size_t msg_size;
2408        int flags = req->r_flags;
2409        u64 data_len;
2410        unsigned int i;
2411
2412        req->r_snapid = snap_id;
2413        req->r_snapc = ceph_get_snap_context(snapc);
2414
2415        /* encode request */
2416        msg->hdr.version = cpu_to_le16(4);
2417
2418        p = msg->front.iov_base;
2419        ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
2420        req->r_request_osdmap_epoch = p;
2421        p += 4;
2422        req->r_request_flags = p;
2423        p += 4;
2424        if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2425                ceph_encode_timespec(p, mtime);
2426        p += sizeof(struct ceph_timespec);
2427        req->r_request_reassert_version = p;
2428        p += sizeof(struct ceph_eversion); /* will get filled in */
2429
2430        /* oloc */
2431        ceph_encode_8(&p, 4);
2432        ceph_encode_8(&p, 4);
2433        ceph_encode_32(&p, 8 + 4 + 4);
2434        req->r_request_pool = p;
2435        p += 8;
2436        ceph_encode_32(&p, -1);  /* preferred */
2437        ceph_encode_32(&p, 0);   /* key len */
2438
2439        ceph_encode_8(&p, 1);
2440        req->r_request_pgid = p;
2441        p += 8 + 4;
2442        ceph_encode_32(&p, -1);  /* preferred */
2443
2444        /* oid */
2445        ceph_encode_32(&p, req->r_base_oid.name_len);
2446        memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2447        dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2448             req->r_base_oid.name, req->r_base_oid.name_len);
2449        p += req->r_base_oid.name_len;
2450
2451        /* ops--can imply data */
2452        ceph_encode_16(&p, (u16)req->r_num_ops);
2453        data_len = 0;
2454        for (i = 0; i < req->r_num_ops; i++) {
2455                data_len += osd_req_encode_op(req, p, i);
2456                p += sizeof(struct ceph_osd_op);
2457        }
2458
2459        /* snaps */
2460        ceph_encode_64(&p, req->r_snapid);
2461        ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2462        ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2463        if (req->r_snapc) {
2464                for (i = 0; i < snapc->num_snaps; i++) {
2465                        ceph_encode_64(&p, req->r_snapc->snaps[i]);
2466                }
2467        }
2468
2469        req->r_request_attempts = p;
2470        p += 4;
2471
2472        /* data */
2473        if (flags & CEPH_OSD_FLAG_WRITE) {
2474                u16 data_off;
2475
2476                /*
2477                 * The header "data_off" is a hint to the receiver
2478                 * allowing it to align received data into its
2479                 * buffers such that there's no need to re-copy
2480                 * it before writing it to disk (direct I/O).
2481                 */
2482                data_off = (u16) (off & 0xffff);
2483                req->r_request->hdr.data_off = cpu_to_le16(data_off);
2484        }
2485        req->r_request->hdr.data_len = cpu_to_le32(data_len);
2486
2487        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2488        msg_size = p - msg->front.iov_base;
2489        msg->front.iov_len = msg_size;
2490        msg->hdr.front_len = cpu_to_le32(msg_size);
2491
2492        dout("build_request msg_size was %d\n", (int)msg_size);
2493}
2494EXPORT_SYMBOL(ceph_osdc_build_request);
2495
2496/*
2497 * Register request, send initial attempt.
2498 */
2499int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2500                            struct ceph_osd_request *req,
2501                            bool nofail)
2502{
2503        int rc;
2504
2505        down_read(&osdc->map_sem);
2506        mutex_lock(&osdc->request_mutex);
2507
2508        rc = __ceph_osdc_start_request(osdc, req, nofail);
2509
2510        mutex_unlock(&osdc->request_mutex);
2511        up_read(&osdc->map_sem);
2512
2513        return rc;
2514}
2515EXPORT_SYMBOL(ceph_osdc_start_request);
2516
2517/*
2518 * Unregister a registered request.  The request is not completed (i.e.
2519 * no callbacks or wakeups) - higher layers are supposed to know what
2520 * they are canceling.
2521 */
2522void ceph_osdc_cancel_request(struct ceph_osd_request *req)
2523{
2524        struct ceph_osd_client *osdc = req->r_osdc;
2525
2526        mutex_lock(&osdc->request_mutex);
2527        if (req->r_linger)
2528                __unregister_linger_request(osdc, req);
2529        __unregister_request(osdc, req);
2530        mutex_unlock(&osdc->request_mutex);
2531
2532        dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
2533}
2534EXPORT_SYMBOL(ceph_osdc_cancel_request);
2535
2536/*
2537 * wait for a request to complete
2538 */
2539int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
2540                           struct ceph_osd_request *req)
2541{
2542        int rc;
2543
2544        dout("%s %p tid %llu\n", __func__, req, req->r_tid);
2545
2546        rc = wait_for_completion_interruptible(&req->r_completion);
2547        if (rc < 0) {
2548                dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
2549                ceph_osdc_cancel_request(req);
2550                complete_request(req);
2551                return rc;
2552        }
2553
2554        dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
2555             req->r_result);
2556        return req->r_result;
2557}
2558EXPORT_SYMBOL(ceph_osdc_wait_request);
2559
2560/*
2561 * sync - wait for all in-flight requests to flush.  avoid starvation.
2562 */
2563void ceph_osdc_sync(struct ceph_osd_client *osdc)
2564{
2565        struct ceph_osd_request *req;
2566        u64 last_tid, next_tid = 0;
2567
2568        mutex_lock(&osdc->request_mutex);
2569        last_tid = osdc->last_tid;
2570        while (1) {
2571                req = __lookup_request_ge(osdc, next_tid);
2572                if (!req)
2573                        break;
2574                if (req->r_tid > last_tid)
2575                        break;
2576
2577                next_tid = req->r_tid + 1;
2578                if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
2579                        continue;
2580
2581                ceph_osdc_get_request(req);
2582                mutex_unlock(&osdc->request_mutex);
2583                dout("sync waiting on tid %llu (last is %llu)\n",
2584                     req->r_tid, last_tid);
2585                wait_for_completion(&req->r_safe_completion);
2586                mutex_lock(&osdc->request_mutex);
2587                ceph_osdc_put_request(req);
2588        }
2589        mutex_unlock(&osdc->request_mutex);
2590        dout("sync done (thru tid %llu)\n", last_tid);
2591}
2592EXPORT_SYMBOL(ceph_osdc_sync);
2593
2594/*
2595 * Call all pending notify callbacks - for use after a watch is
2596 * unregistered, to make sure no more callbacks for it will be invoked
2597 */
2598void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
2599{
2600        flush_workqueue(osdc->notify_wq);
2601}
2602EXPORT_SYMBOL(ceph_osdc_flush_notifies);
2603
2604
2605/*
2606 * init, shutdown
2607 */
2608int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2609{
2610        int err;
2611
2612        dout("init\n");
2613        osdc->client = client;
2614        osdc->osdmap = NULL;
2615        init_rwsem(&osdc->map_sem);
2616        init_completion(&osdc->map_waiters);
2617        osdc->last_requested_map = 0;
2618        mutex_init(&osdc->request_mutex);
2619        osdc->last_tid = 0;
2620        osdc->osds = RB_ROOT;
2621        INIT_LIST_HEAD(&osdc->osd_lru);
2622        osdc->requests = RB_ROOT;
2623        INIT_LIST_HEAD(&osdc->req_lru);
2624        INIT_LIST_HEAD(&osdc->req_unsent);
2625        INIT_LIST_HEAD(&osdc->req_notarget);
2626        INIT_LIST_HEAD(&osdc->req_linger);
2627        osdc->num_requests = 0;
2628        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
2629        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
2630        spin_lock_init(&osdc->event_lock);
2631        osdc->event_tree = RB_ROOT;
2632        osdc->event_count = 0;
2633
2634        schedule_delayed_work(&osdc->osds_timeout_work,
2635            round_jiffies_relative(osdc->client->options->osd_idle_ttl));
2636
2637        err = -ENOMEM;
2638        osdc->req_mempool = mempool_create_kmalloc_pool(10,
2639                                        sizeof(struct ceph_osd_request));
2640        if (!osdc->req_mempool)
2641                goto out;
2642
2643        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
2644                                OSD_OP_FRONT_LEN, 10, true,
2645                                "osd_op");
2646        if (err < 0)
2647                goto out_mempool;
2648        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
2649                                OSD_OPREPLY_FRONT_LEN, 10, true,
2650                                "osd_op_reply");
2651        if (err < 0)
2652                goto out_msgpool;
2653
2654        err = -ENOMEM;
2655        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
2656        if (!osdc->notify_wq)
2657                goto out_msgpool_reply;
2658
2659        return 0;
2660
2661out_msgpool_reply:
2662        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2663out_msgpool:
2664        ceph_msgpool_destroy(&osdc->msgpool_op);
2665out_mempool:
2666        mempool_destroy(osdc->req_mempool);
2667out:
2668        return err;
2669}
2670
2671void ceph_osdc_stop(struct ceph_osd_client *osdc)
2672{
2673        flush_workqueue(osdc->notify_wq);
2674        destroy_workqueue(osdc->notify_wq);
2675        cancel_delayed_work_sync(&osdc->timeout_work);
2676        cancel_delayed_work_sync(&osdc->osds_timeout_work);
2677        if (osdc->osdmap) {
2678                ceph_osdmap_destroy(osdc->osdmap);
2679                osdc->osdmap = NULL;
2680        }
2681        remove_all_osds(osdc);
2682        mempool_destroy(osdc->req_mempool);
2683        ceph_msgpool_destroy(&osdc->msgpool_op);
2684        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2685}
2686
2687/*
2688 * Read some contiguous pages.  If we cross a stripe boundary, shorten
2689 * *plen.  Return number of bytes read, or error.
2690 */
2691int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2692                        struct ceph_vino vino, struct ceph_file_layout *layout,
2693                        u64 off, u64 *plen,
2694                        u32 truncate_seq, u64 truncate_size,
2695                        struct page **pages, int num_pages, int page_align)
2696{
2697        struct ceph_osd_request *req;
2698        int rc = 0;
2699
2700        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
2701             vino.snap, off, *plen);
2702        req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
2703                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
2704                                    NULL, truncate_seq, truncate_size,
2705                                    false);
2706        if (IS_ERR(req))
2707                return PTR_ERR(req);
2708
2709        /* it may be a short read due to an object boundary */
2710
2711        osd_req_op_extent_osd_data_pages(req, 0,
2712                                pages, *plen, page_align, false, false);
2713
2714        dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
2715             off, *plen, *plen, page_align);
2716
2717        ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
2718
2719        rc = ceph_osdc_start_request(osdc, req, false);
2720        if (!rc)
2721                rc = ceph_osdc_wait_request(osdc, req);
2722
2723        ceph_osdc_put_request(req);
2724        dout("readpages result %d\n", rc);
2725        return rc;
2726}
2727EXPORT_SYMBOL(ceph_osdc_readpages);
2728
2729/*
2730 * do a synchronous write on N pages
2731 */
2732int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2733                         struct ceph_file_layout *layout,
2734                         struct ceph_snap_context *snapc,
2735                         u64 off, u64 len,
2736                         u32 truncate_seq, u64 truncate_size,
2737                         struct timespec *mtime,
2738                         struct page **pages, int num_pages)
2739{
2740        struct ceph_osd_request *req;
2741        int rc = 0;
2742        int page_align = off & ~PAGE_MASK;
2743
2744        BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
2745        req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
2746                                    CEPH_OSD_OP_WRITE,
2747                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
2748                                    snapc, truncate_seq, truncate_size,
2749                                    true);
2750        if (IS_ERR(req))
2751                return PTR_ERR(req);
2752
2753        /* it may be a short write due to an object boundary */
2754        osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
2755                                false, false);
2756        dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2757
2758        ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
2759
2760        rc = ceph_osdc_start_request(osdc, req, true);
2761        if (!rc)
2762                rc = ceph_osdc_wait_request(osdc, req);
2763
2764        ceph_osdc_put_request(req);
2765        if (rc == 0)
2766                rc = len;
2767        dout("writepages result %d\n", rc);
2768        return rc;
2769}
2770EXPORT_SYMBOL(ceph_osdc_writepages);
2771
2772int ceph_osdc_setup(void)
2773{
2774        BUG_ON(ceph_osd_request_cache);
2775        ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
2776                                        sizeof (struct ceph_osd_request),
2777                                        __alignof__(struct ceph_osd_request),
2778                                        0, NULL);
2779
2780        return ceph_osd_request_cache ? 0 : -ENOMEM;
2781}
2782EXPORT_SYMBOL(ceph_osdc_setup);
2783
2784void ceph_osdc_cleanup(void)
2785{
2786        BUG_ON(!ceph_osd_request_cache);
2787        kmem_cache_destroy(ceph_osd_request_cache);
2788        ceph_osd_request_cache = NULL;
2789}
2790EXPORT_SYMBOL(ceph_osdc_cleanup);
2791
2792/*
2793 * handle incoming message
2794 */
2795static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2796{
2797        struct ceph_osd *osd = con->private;
2798        struct ceph_osd_client *osdc;
2799        int type = le16_to_cpu(msg->hdr.type);
2800
2801        if (!osd)
2802                goto out;
2803        osdc = osd->o_osdc;
2804
2805        switch (type) {
2806        case CEPH_MSG_OSD_MAP:
2807                ceph_osdc_handle_map(osdc, msg);
2808                break;
2809        case CEPH_MSG_OSD_OPREPLY:
2810                handle_reply(osdc, msg, con);
2811                break;
2812        case CEPH_MSG_WATCH_NOTIFY:
2813                handle_watch_notify(osdc, msg);
2814                break;
2815
2816        default:
2817                pr_err("received unknown message type %d %s\n", type,
2818                       ceph_msg_type_name(type));
2819        }
2820out:
2821        ceph_msg_put(msg);
2822}
2823
2824/*
2825 * Lookup and return message for incoming reply.  Don't try to do
2826 * anything about a larger than preallocated data portion of the
2827 * message at the moment - for now, just skip the message.
2828 */
2829static struct ceph_msg *get_reply(struct ceph_connection *con,
2830                                  struct ceph_msg_header *hdr,
2831                                  int *skip)
2832{
2833        struct ceph_osd *osd = con->private;
2834        struct ceph_osd_client *osdc = osd->o_osdc;
2835        struct ceph_msg *m;
2836        struct ceph_osd_request *req;
2837        int front_len = le32_to_cpu(hdr->front_len);
2838        int data_len = le32_to_cpu(hdr->data_len);
2839        u64 tid;
2840
2841        tid = le64_to_cpu(hdr->tid);
2842        mutex_lock(&osdc->request_mutex);
2843        req = __lookup_request(osdc, tid);
2844        if (!req) {
2845                pr_warn("%s osd%d tid %llu unknown, skipping\n",
2846                        __func__, osd->o_osd, tid);
2847                m = NULL;
2848                *skip = 1;
2849                goto out;
2850        }
2851
2852        if (req->r_reply->con)
2853                dout("%s revoking msg %p from old con %p\n", __func__,
2854                     req->r_reply, req->r_reply->con);
2855        ceph_msg_revoke_incoming(req->r_reply);
2856
2857        if (front_len > req->r_reply->front_alloc_len) {
2858                pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
2859                        __func__, osd->o_osd, req->r_tid, front_len,
2860                        req->r_reply->front_alloc_len);
2861                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2862                                 false);
2863                if (!m)
2864                        goto out;
2865                ceph_msg_put(req->r_reply);
2866                req->r_reply = m;
2867        }
2868
2869        if (data_len > req->r_reply->data_length) {
2870                pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
2871                        __func__, osd->o_osd, req->r_tid, data_len,
2872                        req->r_reply->data_length);
2873                m = NULL;
2874                *skip = 1;
2875                goto out;
2876        }
2877
2878        m = ceph_msg_get(req->r_reply);
2879        dout("get_reply tid %lld %p\n", tid, m);
2880
2881out:
2882        mutex_unlock(&osdc->request_mutex);
2883        return m;
2884}
2885
2886static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2887                                  struct ceph_msg_header *hdr,
2888                                  int *skip)
2889{
2890        struct ceph_osd *osd = con->private;
2891        int type = le16_to_cpu(hdr->type);
2892        int front = le32_to_cpu(hdr->front_len);
2893
2894        *skip = 0;
2895        switch (type) {
2896        case CEPH_MSG_OSD_MAP:
2897        case CEPH_MSG_WATCH_NOTIFY:
2898                return ceph_msg_new(type, front, GFP_NOFS, false);
2899        case CEPH_MSG_OSD_OPREPLY:
2900                return get_reply(con, hdr, skip);
2901        default:
2902                pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2903                        osd->o_osd);
2904                *skip = 1;
2905                return NULL;
2906        }
2907}
2908
2909/*
2910 * Wrappers to refcount containing ceph_osd struct
2911 */
2912static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2913{
2914        struct ceph_osd *osd = con->private;
2915        if (get_osd(osd))
2916                return con;
2917        return NULL;
2918}
2919
2920static void put_osd_con(struct ceph_connection *con)
2921{
2922        struct ceph_osd *osd = con->private;
2923        put_osd(osd);
2924}
2925
2926/*
2927 * authentication
2928 */
2929/*
2930 * Note: returned pointer is the address of a structure that's
2931 * managed separately.  Caller must *not* attempt to free it.
2932 */
2933static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2934                                        int *proto, int force_new)
2935{
2936        struct ceph_osd *o = con->private;
2937        struct ceph_osd_client *osdc = o->o_osdc;
2938        struct ceph_auth_client *ac = osdc->client->monc.auth;
2939        struct ceph_auth_handshake *auth = &o->o_auth;
2940
2941        if (force_new && auth->authorizer) {
2942                ceph_auth_destroy_authorizer(ac, auth->authorizer);
2943                auth->authorizer = NULL;
2944        }
2945        if (!auth->authorizer) {
2946                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2947                                                      auth);
2948                if (ret)
2949                        return ERR_PTR(ret);
2950        } else {
2951                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2952                                                     auth);
2953                if (ret)
2954                        return ERR_PTR(ret);
2955        }
2956        *proto = ac->protocol;
2957
2958        return auth;
2959}
2960
2961
2962static int verify_authorizer_reply(struct ceph_connection *con, int len)
2963{
2964        struct ceph_osd *o = con->private;
2965        struct ceph_osd_client *osdc = o->o_osdc;
2966        struct ceph_auth_client *ac = osdc->client->monc.auth;
2967
2968        return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2969}
2970
2971static int invalidate_authorizer(struct ceph_connection *con)
2972{
2973        struct ceph_osd *o = con->private;
2974        struct ceph_osd_client *osdc = o->o_osdc;
2975        struct ceph_auth_client *ac = osdc->client->monc.auth;
2976
2977        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2978        return ceph_monc_validate_auth(&osdc->client->monc);
2979}
2980
2981static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
2982{
2983        struct ceph_osd *o = con->private;
2984        struct ceph_auth_handshake *auth = &o->o_auth;
2985        return ceph_auth_sign_message(auth, msg);
2986}
2987
2988static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
2989{
2990        struct ceph_osd *o = con->private;
2991        struct ceph_auth_handshake *auth = &o->o_auth;
2992        return ceph_auth_check_message_signature(auth, msg);
2993}
2994
2995static const struct ceph_connection_operations osd_con_ops = {
2996        .get = get_osd_con,
2997        .put = put_osd_con,
2998        .dispatch = dispatch,
2999        .get_authorizer = get_authorizer,
3000        .verify_authorizer_reply = verify_authorizer_reply,
3001        .invalidate_authorizer = invalidate_authorizer,
3002        .alloc_msg = alloc_msg,
3003        .sign_message = sign_message,
3004        .check_message_signature = check_message_signature,
3005        .fault = osd_reset,
3006};
3007