linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/striper.h>
  36#include <linux/ceph/decode.h>
  37#include <linux/fs_parser.h>
  38#include <linux/bsearch.h>
  39
  40#include <linux/kernel.h>
  41#include <linux/device.h>
  42#include <linux/module.h>
  43#include <linux/blk-mq.h>
  44#include <linux/fs.h>
  45#include <linux/blkdev.h>
  46#include <linux/slab.h>
  47#include <linux/idr.h>
  48#include <linux/workqueue.h>
  49
  50#include "rbd_types.h"
  51
  52#define RBD_DEBUG       /* Activate rbd_assert() calls */
  53
  54/*
  55 * Increment the given counter and return its updated value.
  56 * If the counter is already 0 it will not be incremented.
  57 * If the counter is already at its maximum value returns
  58 * -EINVAL without updating it.
  59 */
  60static int atomic_inc_return_safe(atomic_t *v)
  61{
  62        unsigned int counter;
  63
  64        counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
  65        if (counter <= (unsigned int)INT_MAX)
  66                return (int)counter;
  67
  68        atomic_dec(v);
  69
  70        return -EINVAL;
  71}
  72
  73/* Decrement the counter.  Return the resulting value, or -EINVAL */
  74static int atomic_dec_return_safe(atomic_t *v)
  75{
  76        int counter;
  77
  78        counter = atomic_dec_return(v);
  79        if (counter >= 0)
  80                return counter;
  81
  82        atomic_inc(v);
  83
  84        return -EINVAL;
  85}
  86
  87#define RBD_DRV_NAME "rbd"
  88
  89#define RBD_MINORS_PER_MAJOR            256
  90#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  91
  92#define RBD_MAX_PARENT_CHAIN_LEN        16
  93
  94#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  95#define RBD_MAX_SNAP_NAME_LEN   \
  96                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  97
  98#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
  99
 100#define RBD_SNAP_HEAD_NAME      "-"
 101
 102#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 103
 104/* This allows a single page to hold an image name sent by OSD */
 105#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 106#define RBD_IMAGE_ID_LEN_MAX    64
 107
 108#define RBD_OBJ_PREFIX_LEN_MAX  64
 109
 110#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 111#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 112
 113/* Feature bits */
 114
 115#define RBD_FEATURE_LAYERING            (1ULL<<0)
 116#define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
 117#define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
 118#define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
 119#define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
 120#define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
 121#define RBD_FEATURE_DATA_POOL           (1ULL<<7)
 122#define RBD_FEATURE_OPERATIONS          (1ULL<<8)
 123
 124#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 125                                 RBD_FEATURE_STRIPINGV2 |       \
 126                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
 127                                 RBD_FEATURE_OBJECT_MAP |       \
 128                                 RBD_FEATURE_FAST_DIFF |        \
 129                                 RBD_FEATURE_DEEP_FLATTEN |     \
 130                                 RBD_FEATURE_DATA_POOL |        \
 131                                 RBD_FEATURE_OPERATIONS)
 132
 133/* Features supported by this (client software) implementation. */
 134
 135#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 136
 137/*
 138 * An RBD device name will be "rbd#", where the "rbd" comes from
 139 * RBD_DRV_NAME above, and # is a unique integer identifier.
 140 */
 141#define DEV_NAME_LEN            32
 142
 143/*
 144 * block device image metadata (in-memory version)
 145 */
 146struct rbd_image_header {
 147        /* These six fields never change for a given rbd image */
 148        char *object_prefix;
 149        __u8 obj_order;
 150        u64 stripe_unit;
 151        u64 stripe_count;
 152        s64 data_pool_id;
 153        u64 features;           /* Might be changeable someday? */
 154
 155        /* The remaining fields need to be updated occasionally */
 156        u64 image_size;
 157        struct ceph_snap_context *snapc;
 158        char *snap_names;       /* format 1 only */
 159        u64 *snap_sizes;        /* format 1 only */
 160};
 161
 162/*
 163 * An rbd image specification.
 164 *
 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 166 * identify an image.  Each rbd_dev structure includes a pointer to
 167 * an rbd_spec structure that encapsulates this identity.
 168 *
 169 * Each of the id's in an rbd_spec has an associated name.  For a
 170 * user-mapped image, the names are supplied and the id's associated
 171 * with them are looked up.  For a layered image, a parent image is
 172 * defined by the tuple, and the names are looked up.
 173 *
 174 * An rbd_dev structure contains a parent_spec pointer which is
 175 * non-null if the image it represents is a child in a layered
 176 * image.  This pointer will refer to the rbd_spec structure used
 177 * by the parent rbd_dev for its own identity (i.e., the structure
 178 * is shared between the parent and child).
 179 *
 180 * Since these structures are populated once, during the discovery
 181 * phase of image construction, they are effectively immutable so
 182 * we make no effort to synchronize access to them.
 183 *
 184 * Note that code herein does not assume the image name is known (it
 185 * could be a null pointer).
 186 */
 187struct rbd_spec {
 188        u64             pool_id;
 189        const char      *pool_name;
 190        const char      *pool_ns;       /* NULL if default, never "" */
 191
 192        const char      *image_id;
 193        const char      *image_name;
 194
 195        u64             snap_id;
 196        const char      *snap_name;
 197
 198        struct kref     kref;
 199};
 200
 201/*
 202 * an instance of the client.  multiple devices may share an rbd client.
 203 */
 204struct rbd_client {
 205        struct ceph_client      *client;
 206        struct kref             kref;
 207        struct list_head        node;
 208};
 209
 210struct pending_result {
 211        int                     result;         /* first nonzero result */
 212        int                     num_pending;
 213};
 214
 215struct rbd_img_request;
 216
 217enum obj_request_type {
 218        OBJ_REQUEST_NODATA = 1,
 219        OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
 220        OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
 221        OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 222};
 223
 224enum obj_operation_type {
 225        OBJ_OP_READ = 1,
 226        OBJ_OP_WRITE,
 227        OBJ_OP_DISCARD,
 228        OBJ_OP_ZEROOUT,
 229};
 230
 231#define RBD_OBJ_FLAG_DELETION                   (1U << 0)
 232#define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
 233#define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
 234#define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
 235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
 236
 237enum rbd_obj_read_state {
 238        RBD_OBJ_READ_START = 1,
 239        RBD_OBJ_READ_OBJECT,
 240        RBD_OBJ_READ_PARENT,
 241};
 242
 243/*
 244 * Writes go through the following state machine to deal with
 245 * layering:
 246 *
 247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
 248 *            .                 |                                    .
 249 *            .                 v                                    .
 250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
 251 *            .                 |                    .               .
 252 *            .                 v                    v (deep-copyup  .
 253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
 254 * flattened) v                 |                    .               .
 255 *            .                 v                    .               .
 256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
 257 *                              |                        not needed) v
 258 *                              v                                    .
 259 *                            done . . . . . . . . . . . . . . . . . .
 260 *                              ^
 261 *                              |
 262 *                     RBD_OBJ_WRITE_FLAT
 263 *
 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 265 * assert_exists guard is needed or not (in some cases it's not needed
 266 * even if there is a parent).
 267 */
 268enum rbd_obj_write_state {
 269        RBD_OBJ_WRITE_START = 1,
 270        RBD_OBJ_WRITE_PRE_OBJECT_MAP,
 271        RBD_OBJ_WRITE_OBJECT,
 272        __RBD_OBJ_WRITE_COPYUP,
 273        RBD_OBJ_WRITE_COPYUP,
 274        RBD_OBJ_WRITE_POST_OBJECT_MAP,
 275};
 276
 277enum rbd_obj_copyup_state {
 278        RBD_OBJ_COPYUP_START = 1,
 279        RBD_OBJ_COPYUP_READ_PARENT,
 280        __RBD_OBJ_COPYUP_OBJECT_MAPS,
 281        RBD_OBJ_COPYUP_OBJECT_MAPS,
 282        __RBD_OBJ_COPYUP_WRITE_OBJECT,
 283        RBD_OBJ_COPYUP_WRITE_OBJECT,
 284};
 285
 286struct rbd_obj_request {
 287        struct ceph_object_extent ex;
 288        unsigned int            flags;  /* RBD_OBJ_FLAG_* */
 289        union {
 290                enum rbd_obj_read_state  read_state;    /* for reads */
 291                enum rbd_obj_write_state write_state;   /* for writes */
 292        };
 293
 294        struct rbd_img_request  *img_request;
 295        struct ceph_file_extent *img_extents;
 296        u32                     num_img_extents;
 297
 298        union {
 299                struct ceph_bio_iter    bio_pos;
 300                struct {
 301                        struct ceph_bvec_iter   bvec_pos;
 302                        u32                     bvec_count;
 303                        u32                     bvec_idx;
 304                };
 305        };
 306
 307        enum rbd_obj_copyup_state copyup_state;
 308        struct bio_vec          *copyup_bvecs;
 309        u32                     copyup_bvec_count;
 310
 311        struct list_head        osd_reqs;       /* w/ r_private_item */
 312
 313        struct mutex            state_mutex;
 314        struct pending_result   pending;
 315        struct kref             kref;
 316};
 317
 318enum img_req_flags {
 319        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 320        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 321};
 322
 323enum rbd_img_state {
 324        RBD_IMG_START = 1,
 325        RBD_IMG_EXCLUSIVE_LOCK,
 326        __RBD_IMG_OBJECT_REQUESTS,
 327        RBD_IMG_OBJECT_REQUESTS,
 328};
 329
 330struct rbd_img_request {
 331        struct rbd_device       *rbd_dev;
 332        enum obj_operation_type op_type;
 333        enum obj_request_type   data_type;
 334        unsigned long           flags;
 335        enum rbd_img_state      state;
 336        union {
 337                u64                     snap_id;        /* for reads */
 338                struct ceph_snap_context *snapc;        /* for writes */
 339        };
 340        struct rbd_obj_request  *obj_request;   /* obj req initiator */
 341
 342        struct list_head        lock_item;
 343        struct list_head        object_extents; /* obj_req.ex structs */
 344
 345        struct mutex            state_mutex;
 346        struct pending_result   pending;
 347        struct work_struct      work;
 348        int                     work_result;
 349};
 350
 351#define for_each_obj_request(ireq, oreq) \
 352        list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 353#define for_each_obj_request_safe(ireq, oreq, n) \
 354        list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 355
 356enum rbd_watch_state {
 357        RBD_WATCH_STATE_UNREGISTERED,
 358        RBD_WATCH_STATE_REGISTERED,
 359        RBD_WATCH_STATE_ERROR,
 360};
 361
 362enum rbd_lock_state {
 363        RBD_LOCK_STATE_UNLOCKED,
 364        RBD_LOCK_STATE_LOCKED,
 365        RBD_LOCK_STATE_RELEASING,
 366};
 367
 368/* WatchNotify::ClientId */
 369struct rbd_client_id {
 370        u64 gid;
 371        u64 handle;
 372};
 373
 374struct rbd_mapping {
 375        u64                     size;
 376};
 377
 378/*
 379 * a single device
 380 */
 381struct rbd_device {
 382        int                     dev_id;         /* blkdev unique id */
 383
 384        int                     major;          /* blkdev assigned major */
 385        int                     minor;
 386        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 387
 388        u32                     image_format;   /* Either 1 or 2 */
 389        struct rbd_client       *rbd_client;
 390
 391        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 392
 393        spinlock_t              lock;           /* queue, flags, open_count */
 394
 395        struct rbd_image_header header;
 396        unsigned long           flags;          /* possibly lock protected */
 397        struct rbd_spec         *spec;
 398        struct rbd_options      *opts;
 399        char                    *config_info;   /* add{,_single_major} string */
 400
 401        struct ceph_object_id   header_oid;
 402        struct ceph_object_locator header_oloc;
 403
 404        struct ceph_file_layout layout;         /* used for all rbd requests */
 405
 406        struct mutex            watch_mutex;
 407        enum rbd_watch_state    watch_state;
 408        struct ceph_osd_linger_request *watch_handle;
 409        u64                     watch_cookie;
 410        struct delayed_work     watch_dwork;
 411
 412        struct rw_semaphore     lock_rwsem;
 413        enum rbd_lock_state     lock_state;
 414        char                    lock_cookie[32];
 415        struct rbd_client_id    owner_cid;
 416        struct work_struct      acquired_lock_work;
 417        struct work_struct      released_lock_work;
 418        struct delayed_work     lock_dwork;
 419        struct work_struct      unlock_work;
 420        spinlock_t              lock_lists_lock;
 421        struct list_head        acquiring_list;
 422        struct list_head        running_list;
 423        struct completion       acquire_wait;
 424        int                     acquire_err;
 425        struct completion       releasing_wait;
 426
 427        spinlock_t              object_map_lock;
 428        u8                      *object_map;
 429        u64                     object_map_size;        /* in objects */
 430        u64                     object_map_flags;
 431
 432        struct workqueue_struct *task_wq;
 433
 434        struct rbd_spec         *parent_spec;
 435        u64                     parent_overlap;
 436        atomic_t                parent_ref;
 437        struct rbd_device       *parent;
 438
 439        /* Block layer tags. */
 440        struct blk_mq_tag_set   tag_set;
 441
 442        /* protects updating the header */
 443        struct rw_semaphore     header_rwsem;
 444
 445        struct rbd_mapping      mapping;
 446
 447        struct list_head        node;
 448
 449        /* sysfs related */
 450        struct device           dev;
 451        unsigned long           open_count;     /* protected by lock */
 452};
 453
 454/*
 455 * Flag bits for rbd_dev->flags:
 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 457 *   by rbd_dev->lock
 458 */
 459enum rbd_dev_flags {
 460        RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
 461        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 462        RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 463};
 464
 465static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 466
 467static LIST_HEAD(rbd_dev_list);    /* devices */
 468static DEFINE_SPINLOCK(rbd_dev_list_lock);
 469
 470static LIST_HEAD(rbd_client_list);              /* clients */
 471static DEFINE_SPINLOCK(rbd_client_list_lock);
 472
 473/* Slab caches for frequently-allocated structures */
 474
 475static struct kmem_cache        *rbd_img_request_cache;
 476static struct kmem_cache        *rbd_obj_request_cache;
 477
 478static int rbd_major;
 479static DEFINE_IDA(rbd_dev_id_ida);
 480
 481static struct workqueue_struct *rbd_wq;
 482
 483static struct ceph_snap_context rbd_empty_snapc = {
 484        .nref = REFCOUNT_INIT(1),
 485};
 486
 487/*
 488 * single-major requires >= 0.75 version of userspace rbd utility.
 489 */
 490static bool single_major = true;
 491module_param(single_major, bool, 0444);
 492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 493
 494static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
 495static ssize_t remove_store(struct bus_type *bus, const char *buf,
 496                            size_t count);
 497static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 498                                      size_t count);
 499static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 500                                         size_t count);
 501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 502
 503static int rbd_dev_id_to_minor(int dev_id)
 504{
 505        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 506}
 507
 508static int minor_to_rbd_dev_id(int minor)
 509{
 510        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 511}
 512
 513static bool rbd_is_ro(struct rbd_device *rbd_dev)
 514{
 515        return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
 516}
 517
 518static bool rbd_is_snap(struct rbd_device *rbd_dev)
 519{
 520        return rbd_dev->spec->snap_id != CEPH_NOSNAP;
 521}
 522
 523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 524{
 525        lockdep_assert_held(&rbd_dev->lock_rwsem);
 526
 527        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 528               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 529}
 530
 531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 532{
 533        bool is_lock_owner;
 534
 535        down_read(&rbd_dev->lock_rwsem);
 536        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 537        up_read(&rbd_dev->lock_rwsem);
 538        return is_lock_owner;
 539}
 540
 541static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 542{
 543        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 544}
 545
 546static BUS_ATTR_WO(add);
 547static BUS_ATTR_WO(remove);
 548static BUS_ATTR_WO(add_single_major);
 549static BUS_ATTR_WO(remove_single_major);
 550static BUS_ATTR_RO(supported_features);
 551
 552static struct attribute *rbd_bus_attrs[] = {
 553        &bus_attr_add.attr,
 554        &bus_attr_remove.attr,
 555        &bus_attr_add_single_major.attr,
 556        &bus_attr_remove_single_major.attr,
 557        &bus_attr_supported_features.attr,
 558        NULL,
 559};
 560
 561static umode_t rbd_bus_is_visible(struct kobject *kobj,
 562                                  struct attribute *attr, int index)
 563{
 564        if (!single_major &&
 565            (attr == &bus_attr_add_single_major.attr ||
 566             attr == &bus_attr_remove_single_major.attr))
 567                return 0;
 568
 569        return attr->mode;
 570}
 571
 572static const struct attribute_group rbd_bus_group = {
 573        .attrs = rbd_bus_attrs,
 574        .is_visible = rbd_bus_is_visible,
 575};
 576__ATTRIBUTE_GROUPS(rbd_bus);
 577
 578static struct bus_type rbd_bus_type = {
 579        .name           = "rbd",
 580        .bus_groups     = rbd_bus_groups,
 581};
 582
 583static void rbd_root_dev_release(struct device *dev)
 584{
 585}
 586
 587static struct device rbd_root_dev = {
 588        .init_name =    "rbd",
 589        .release =      rbd_root_dev_release,
 590};
 591
 592static __printf(2, 3)
 593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 594{
 595        struct va_format vaf;
 596        va_list args;
 597
 598        va_start(args, fmt);
 599        vaf.fmt = fmt;
 600        vaf.va = &args;
 601
 602        if (!rbd_dev)
 603                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 604        else if (rbd_dev->disk)
 605                printk(KERN_WARNING "%s: %s: %pV\n",
 606                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 607        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 608                printk(KERN_WARNING "%s: image %s: %pV\n",
 609                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 610        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 611                printk(KERN_WARNING "%s: id %s: %pV\n",
 612                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 613        else    /* punt */
 614                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 615                        RBD_DRV_NAME, rbd_dev, &vaf);
 616        va_end(args);
 617}
 618
 619#ifdef RBD_DEBUG
 620#define rbd_assert(expr)                                                \
 621                if (unlikely(!(expr))) {                                \
 622                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 623                                                "at line %d:\n\n"       \
 624                                        "\trbd_assert(%s);\n\n",        \
 625                                        __func__, __LINE__, #expr);     \
 626                        BUG();                                          \
 627                }
 628#else /* !RBD_DEBUG */
 629#  define rbd_assert(expr)      ((void) 0)
 630#endif /* !RBD_DEBUG */
 631
 632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 633
 634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 636static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 637static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 638static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 639                                        u64 snap_id);
 640static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 641                                u8 *order, u64 *snap_size);
 642static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
 643
 644static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
 645static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
 646
 647/*
 648 * Return true if nothing else is pending.
 649 */
 650static bool pending_result_dec(struct pending_result *pending, int *result)
 651{
 652        rbd_assert(pending->num_pending > 0);
 653
 654        if (*result && !pending->result)
 655                pending->result = *result;
 656        if (--pending->num_pending)
 657                return false;
 658
 659        *result = pending->result;
 660        return true;
 661}
 662
 663static int rbd_open(struct block_device *bdev, fmode_t mode)
 664{
 665        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 666        bool removing = false;
 667
 668        spin_lock_irq(&rbd_dev->lock);
 669        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 670                removing = true;
 671        else
 672                rbd_dev->open_count++;
 673        spin_unlock_irq(&rbd_dev->lock);
 674        if (removing)
 675                return -ENOENT;
 676
 677        (void) get_device(&rbd_dev->dev);
 678
 679        return 0;
 680}
 681
 682static void rbd_release(struct gendisk *disk, fmode_t mode)
 683{
 684        struct rbd_device *rbd_dev = disk->private_data;
 685        unsigned long open_count_before;
 686
 687        spin_lock_irq(&rbd_dev->lock);
 688        open_count_before = rbd_dev->open_count--;
 689        spin_unlock_irq(&rbd_dev->lock);
 690        rbd_assert(open_count_before > 0);
 691
 692        put_device(&rbd_dev->dev);
 693}
 694
 695static const struct block_device_operations rbd_bd_ops = {
 696        .owner                  = THIS_MODULE,
 697        .open                   = rbd_open,
 698        .release                = rbd_release,
 699};
 700
 701/*
 702 * Initialize an rbd client instance.  Success or not, this function
 703 * consumes ceph_opts.  Caller holds client_mutex.
 704 */
 705static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 706{
 707        struct rbd_client *rbdc;
 708        int ret = -ENOMEM;
 709
 710        dout("%s:\n", __func__);
 711        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 712        if (!rbdc)
 713                goto out_opt;
 714
 715        kref_init(&rbdc->kref);
 716        INIT_LIST_HEAD(&rbdc->node);
 717
 718        rbdc->client = ceph_create_client(ceph_opts, rbdc);
 719        if (IS_ERR(rbdc->client))
 720                goto out_rbdc;
 721        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 722
 723        ret = ceph_open_session(rbdc->client);
 724        if (ret < 0)
 725                goto out_client;
 726
 727        spin_lock(&rbd_client_list_lock);
 728        list_add_tail(&rbdc->node, &rbd_client_list);
 729        spin_unlock(&rbd_client_list_lock);
 730
 731        dout("%s: rbdc %p\n", __func__, rbdc);
 732
 733        return rbdc;
 734out_client:
 735        ceph_destroy_client(rbdc->client);
 736out_rbdc:
 737        kfree(rbdc);
 738out_opt:
 739        if (ceph_opts)
 740                ceph_destroy_options(ceph_opts);
 741        dout("%s: error %d\n", __func__, ret);
 742
 743        return ERR_PTR(ret);
 744}
 745
 746static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 747{
 748        kref_get(&rbdc->kref);
 749
 750        return rbdc;
 751}
 752
 753/*
 754 * Find a ceph client with specific addr and configuration.  If
 755 * found, bump its reference count.
 756 */
 757static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 758{
 759        struct rbd_client *client_node;
 760        bool found = false;
 761
 762        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 763                return NULL;
 764
 765        spin_lock(&rbd_client_list_lock);
 766        list_for_each_entry(client_node, &rbd_client_list, node) {
 767                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 768                        __rbd_get_client(client_node);
 769
 770                        found = true;
 771                        break;
 772                }
 773        }
 774        spin_unlock(&rbd_client_list_lock);
 775
 776        return found ? client_node : NULL;
 777}
 778
 779/*
 780 * (Per device) rbd map options
 781 */
 782enum {
 783        Opt_queue_depth,
 784        Opt_alloc_size,
 785        Opt_lock_timeout,
 786        /* int args above */
 787        Opt_pool_ns,
 788        Opt_compression_hint,
 789        /* string args above */
 790        Opt_read_only,
 791        Opt_read_write,
 792        Opt_lock_on_read,
 793        Opt_exclusive,
 794        Opt_notrim,
 795};
 796
 797enum {
 798        Opt_compression_hint_none,
 799        Opt_compression_hint_compressible,
 800        Opt_compression_hint_incompressible,
 801};
 802
 803static const struct constant_table rbd_param_compression_hint[] = {
 804        {"none",                Opt_compression_hint_none},
 805        {"compressible",        Opt_compression_hint_compressible},
 806        {"incompressible",      Opt_compression_hint_incompressible},
 807        {}
 808};
 809
 810static const struct fs_parameter_spec rbd_parameters[] = {
 811        fsparam_u32     ("alloc_size",                  Opt_alloc_size),
 812        fsparam_enum    ("compression_hint",            Opt_compression_hint,
 813                         rbd_param_compression_hint),
 814        fsparam_flag    ("exclusive",                   Opt_exclusive),
 815        fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
 816        fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
 817        fsparam_flag    ("notrim",                      Opt_notrim),
 818        fsparam_string  ("_pool_ns",                    Opt_pool_ns),
 819        fsparam_u32     ("queue_depth",                 Opt_queue_depth),
 820        fsparam_flag    ("read_only",                   Opt_read_only),
 821        fsparam_flag    ("read_write",                  Opt_read_write),
 822        fsparam_flag    ("ro",                          Opt_read_only),
 823        fsparam_flag    ("rw",                          Opt_read_write),
 824        {}
 825};
 826
 827struct rbd_options {
 828        int     queue_depth;
 829        int     alloc_size;
 830        unsigned long   lock_timeout;
 831        bool    read_only;
 832        bool    lock_on_read;
 833        bool    exclusive;
 834        bool    trim;
 835
 836        u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 837};
 838
 839#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 840#define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
 841#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 842#define RBD_READ_ONLY_DEFAULT   false
 843#define RBD_LOCK_ON_READ_DEFAULT false
 844#define RBD_EXCLUSIVE_DEFAULT   false
 845#define RBD_TRIM_DEFAULT        true
 846
 847struct rbd_parse_opts_ctx {
 848        struct rbd_spec         *spec;
 849        struct ceph_options     *copts;
 850        struct rbd_options      *opts;
 851};
 852
 853static char* obj_op_name(enum obj_operation_type op_type)
 854{
 855        switch (op_type) {
 856        case OBJ_OP_READ:
 857                return "read";
 858        case OBJ_OP_WRITE:
 859                return "write";
 860        case OBJ_OP_DISCARD:
 861                return "discard";
 862        case OBJ_OP_ZEROOUT:
 863                return "zeroout";
 864        default:
 865                return "???";
 866        }
 867}
 868
 869/*
 870 * Destroy ceph client
 871 *
 872 * Caller must hold rbd_client_list_lock.
 873 */
 874static void rbd_client_release(struct kref *kref)
 875{
 876        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 877
 878        dout("%s: rbdc %p\n", __func__, rbdc);
 879        spin_lock(&rbd_client_list_lock);
 880        list_del(&rbdc->node);
 881        spin_unlock(&rbd_client_list_lock);
 882
 883        ceph_destroy_client(rbdc->client);
 884        kfree(rbdc);
 885}
 886
 887/*
 888 * Drop reference to ceph client node. If it's not referenced anymore, release
 889 * it.
 890 */
 891static void rbd_put_client(struct rbd_client *rbdc)
 892{
 893        if (rbdc)
 894                kref_put(&rbdc->kref, rbd_client_release);
 895}
 896
 897/*
 898 * Get a ceph client with specific addr and configuration, if one does
 899 * not exist create it.  Either way, ceph_opts is consumed by this
 900 * function.
 901 */
 902static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 903{
 904        struct rbd_client *rbdc;
 905        int ret;
 906
 907        mutex_lock(&client_mutex);
 908        rbdc = rbd_client_find(ceph_opts);
 909        if (rbdc) {
 910                ceph_destroy_options(ceph_opts);
 911
 912                /*
 913                 * Using an existing client.  Make sure ->pg_pools is up to
 914                 * date before we look up the pool id in do_rbd_add().
 915                 */
 916                ret = ceph_wait_for_latest_osdmap(rbdc->client,
 917                                        rbdc->client->options->mount_timeout);
 918                if (ret) {
 919                        rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
 920                        rbd_put_client(rbdc);
 921                        rbdc = ERR_PTR(ret);
 922                }
 923        } else {
 924                rbdc = rbd_client_create(ceph_opts);
 925        }
 926        mutex_unlock(&client_mutex);
 927
 928        return rbdc;
 929}
 930
 931static bool rbd_image_format_valid(u32 image_format)
 932{
 933        return image_format == 1 || image_format == 2;
 934}
 935
 936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 937{
 938        size_t size;
 939        u32 snap_count;
 940
 941        /* The header has to start with the magic rbd header text */
 942        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 943                return false;
 944
 945        /* The bio layer requires at least sector-sized I/O */
 946
 947        if (ondisk->options.order < SECTOR_SHIFT)
 948                return false;
 949
 950        /* If we use u64 in a few spots we may be able to loosen this */
 951
 952        if (ondisk->options.order > 8 * sizeof (int) - 1)
 953                return false;
 954
 955        /*
 956         * The size of a snapshot header has to fit in a size_t, and
 957         * that limits the number of snapshots.
 958         */
 959        snap_count = le32_to_cpu(ondisk->snap_count);
 960        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 961        if (snap_count > size / sizeof (__le64))
 962                return false;
 963
 964        /*
 965         * Not only that, but the size of the entire the snapshot
 966         * header must also be representable in a size_t.
 967         */
 968        size -= snap_count * sizeof (__le64);
 969        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 970                return false;
 971
 972        return true;
 973}
 974
 975/*
 976 * returns the size of an object in the image
 977 */
 978static u32 rbd_obj_bytes(struct rbd_image_header *header)
 979{
 980        return 1U << header->obj_order;
 981}
 982
 983static void rbd_init_layout(struct rbd_device *rbd_dev)
 984{
 985        if (rbd_dev->header.stripe_unit == 0 ||
 986            rbd_dev->header.stripe_count == 0) {
 987                rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
 988                rbd_dev->header.stripe_count = 1;
 989        }
 990
 991        rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
 992        rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
 993        rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
 994        rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
 995                          rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
 996        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 997}
 998
 999/*
1000 * Fill an rbd image header with information from the given format 1
1001 * on-disk header.
1002 */
1003static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1004                                 struct rbd_image_header_ondisk *ondisk)
1005{
1006        struct rbd_image_header *header = &rbd_dev->header;
1007        bool first_time = header->object_prefix == NULL;
1008        struct ceph_snap_context *snapc;
1009        char *object_prefix = NULL;
1010        char *snap_names = NULL;
1011        u64 *snap_sizes = NULL;
1012        u32 snap_count;
1013        int ret = -ENOMEM;
1014        u32 i;
1015
1016        /* Allocate this now to avoid having to handle failure below */
1017
1018        if (first_time) {
1019                object_prefix = kstrndup(ondisk->object_prefix,
1020                                         sizeof(ondisk->object_prefix),
1021                                         GFP_KERNEL);
1022                if (!object_prefix)
1023                        return -ENOMEM;
1024        }
1025
1026        /* Allocate the snapshot context and fill it in */
1027
1028        snap_count = le32_to_cpu(ondisk->snap_count);
1029        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1030        if (!snapc)
1031                goto out_err;
1032        snapc->seq = le64_to_cpu(ondisk->snap_seq);
1033        if (snap_count) {
1034                struct rbd_image_snap_ondisk *snaps;
1035                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1036
1037                /* We'll keep a copy of the snapshot names... */
1038
1039                if (snap_names_len > (u64)SIZE_MAX)
1040                        goto out_2big;
1041                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1042                if (!snap_names)
1043                        goto out_err;
1044
1045                /* ...as well as the array of their sizes. */
1046                snap_sizes = kmalloc_array(snap_count,
1047                                           sizeof(*header->snap_sizes),
1048                                           GFP_KERNEL);
1049                if (!snap_sizes)
1050                        goto out_err;
1051
1052                /*
1053                 * Copy the names, and fill in each snapshot's id
1054                 * and size.
1055                 *
1056                 * Note that rbd_dev_v1_header_info() guarantees the
1057                 * ondisk buffer we're working with has
1058                 * snap_names_len bytes beyond the end of the
1059                 * snapshot id array, this memcpy() is safe.
1060                 */
1061                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1062                snaps = ondisk->snaps;
1063                for (i = 0; i < snap_count; i++) {
1064                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1065                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1066                }
1067        }
1068
1069        /* We won't fail any more, fill in the header */
1070
1071        if (first_time) {
1072                header->object_prefix = object_prefix;
1073                header->obj_order = ondisk->options.order;
1074                rbd_init_layout(rbd_dev);
1075        } else {
1076                ceph_put_snap_context(header->snapc);
1077                kfree(header->snap_names);
1078                kfree(header->snap_sizes);
1079        }
1080
1081        /* The remaining fields always get updated (when we refresh) */
1082
1083        header->image_size = le64_to_cpu(ondisk->image_size);
1084        header->snapc = snapc;
1085        header->snap_names = snap_names;
1086        header->snap_sizes = snap_sizes;
1087
1088        return 0;
1089out_2big:
1090        ret = -EIO;
1091out_err:
1092        kfree(snap_sizes);
1093        kfree(snap_names);
1094        ceph_put_snap_context(snapc);
1095        kfree(object_prefix);
1096
1097        return ret;
1098}
1099
1100static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1101{
1102        const char *snap_name;
1103
1104        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1105
1106        /* Skip over names until we find the one we are looking for */
1107
1108        snap_name = rbd_dev->header.snap_names;
1109        while (which--)
1110                snap_name += strlen(snap_name) + 1;
1111
1112        return kstrdup(snap_name, GFP_KERNEL);
1113}
1114
1115/*
1116 * Snapshot id comparison function for use with qsort()/bsearch().
1117 * Note that result is for snapshots in *descending* order.
1118 */
1119static int snapid_compare_reverse(const void *s1, const void *s2)
1120{
1121        u64 snap_id1 = *(u64 *)s1;
1122        u64 snap_id2 = *(u64 *)s2;
1123
1124        if (snap_id1 < snap_id2)
1125                return 1;
1126        return snap_id1 == snap_id2 ? 0 : -1;
1127}
1128
1129/*
1130 * Search a snapshot context to see if the given snapshot id is
1131 * present.
1132 *
1133 * Returns the position of the snapshot id in the array if it's found,
1134 * or BAD_SNAP_INDEX otherwise.
1135 *
1136 * Note: The snapshot array is in kept sorted (by the osd) in
1137 * reverse order, highest snapshot id first.
1138 */
1139static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1140{
1141        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1142        u64 *found;
1143
1144        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1145                                sizeof (snap_id), snapid_compare_reverse);
1146
1147        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1148}
1149
1150static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1151                                        u64 snap_id)
1152{
1153        u32 which;
1154        const char *snap_name;
1155
1156        which = rbd_dev_snap_index(rbd_dev, snap_id);
1157        if (which == BAD_SNAP_INDEX)
1158                return ERR_PTR(-ENOENT);
1159
1160        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1161        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1162}
1163
1164static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1165{
1166        if (snap_id == CEPH_NOSNAP)
1167                return RBD_SNAP_HEAD_NAME;
1168
1169        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1170        if (rbd_dev->image_format == 1)
1171                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1172
1173        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1174}
1175
1176static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1177                                u64 *snap_size)
1178{
1179        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1180        if (snap_id == CEPH_NOSNAP) {
1181                *snap_size = rbd_dev->header.image_size;
1182        } else if (rbd_dev->image_format == 1) {
1183                u32 which;
1184
1185                which = rbd_dev_snap_index(rbd_dev, snap_id);
1186                if (which == BAD_SNAP_INDEX)
1187                        return -ENOENT;
1188
1189                *snap_size = rbd_dev->header.snap_sizes[which];
1190        } else {
1191                u64 size = 0;
1192                int ret;
1193
1194                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1195                if (ret)
1196                        return ret;
1197
1198                *snap_size = size;
1199        }
1200        return 0;
1201}
1202
1203static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1204{
1205        u64 snap_id = rbd_dev->spec->snap_id;
1206        u64 size = 0;
1207        int ret;
1208
1209        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1210        if (ret)
1211                return ret;
1212
1213        rbd_dev->mapping.size = size;
1214        return 0;
1215}
1216
1217static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1218{
1219        rbd_dev->mapping.size = 0;
1220}
1221
1222static void zero_bvec(struct bio_vec *bv)
1223{
1224        void *buf;
1225        unsigned long flags;
1226
1227        buf = bvec_kmap_irq(bv, &flags);
1228        memset(buf, 0, bv->bv_len);
1229        flush_dcache_page(bv->bv_page);
1230        bvec_kunmap_irq(buf, &flags);
1231}
1232
1233static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1234{
1235        struct ceph_bio_iter it = *bio_pos;
1236
1237        ceph_bio_iter_advance(&it, off);
1238        ceph_bio_iter_advance_step(&it, bytes, ({
1239                zero_bvec(&bv);
1240        }));
1241}
1242
1243static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1244{
1245        struct ceph_bvec_iter it = *bvec_pos;
1246
1247        ceph_bvec_iter_advance(&it, off);
1248        ceph_bvec_iter_advance_step(&it, bytes, ({
1249                zero_bvec(&bv);
1250        }));
1251}
1252
1253/*
1254 * Zero a range in @obj_req data buffer defined by a bio (list) or
1255 * (private) bio_vec array.
1256 *
1257 * @off is relative to the start of the data buffer.
1258 */
1259static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1260                               u32 bytes)
1261{
1262        dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1263
1264        switch (obj_req->img_request->data_type) {
1265        case OBJ_REQUEST_BIO:
1266                zero_bios(&obj_req->bio_pos, off, bytes);
1267                break;
1268        case OBJ_REQUEST_BVECS:
1269        case OBJ_REQUEST_OWN_BVECS:
1270                zero_bvecs(&obj_req->bvec_pos, off, bytes);
1271                break;
1272        default:
1273                BUG();
1274        }
1275}
1276
1277static void rbd_obj_request_destroy(struct kref *kref);
1278static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1279{
1280        rbd_assert(obj_request != NULL);
1281        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1282                kref_read(&obj_request->kref));
1283        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1284}
1285
1286static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1287                                        struct rbd_obj_request *obj_request)
1288{
1289        rbd_assert(obj_request->img_request == NULL);
1290
1291        /* Image request now owns object's original reference */
1292        obj_request->img_request = img_request;
1293        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1294}
1295
1296static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1297                                        struct rbd_obj_request *obj_request)
1298{
1299        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1300        list_del(&obj_request->ex.oe_item);
1301        rbd_assert(obj_request->img_request == img_request);
1302        rbd_obj_request_put(obj_request);
1303}
1304
1305static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1306{
1307        struct rbd_obj_request *obj_req = osd_req->r_priv;
1308
1309        dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1310             __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1311             obj_req->ex.oe_off, obj_req->ex.oe_len);
1312        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1313}
1314
1315/*
1316 * The default/initial value for all image request flags is 0.  Each
1317 * is conditionally set to 1 at image request initialization time
1318 * and currently never change thereafter.
1319 */
1320static void img_request_layered_set(struct rbd_img_request *img_request)
1321{
1322        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1323}
1324
1325static bool img_request_layered_test(struct rbd_img_request *img_request)
1326{
1327        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1328}
1329
1330static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1331{
1332        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1333
1334        return !obj_req->ex.oe_off &&
1335               obj_req->ex.oe_len == rbd_dev->layout.object_size;
1336}
1337
1338static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1339{
1340        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1341
1342        return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1343                                        rbd_dev->layout.object_size;
1344}
1345
1346/*
1347 * Must be called after rbd_obj_calc_img_extents().
1348 */
1349static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1350{
1351        if (!obj_req->num_img_extents ||
1352            (rbd_obj_is_entire(obj_req) &&
1353             !obj_req->img_request->snapc->num_snaps))
1354                return false;
1355
1356        return true;
1357}
1358
1359static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1360{
1361        return ceph_file_extents_bytes(obj_req->img_extents,
1362                                       obj_req->num_img_extents);
1363}
1364
1365static bool rbd_img_is_write(struct rbd_img_request *img_req)
1366{
1367        switch (img_req->op_type) {
1368        case OBJ_OP_READ:
1369                return false;
1370        case OBJ_OP_WRITE:
1371        case OBJ_OP_DISCARD:
1372        case OBJ_OP_ZEROOUT:
1373                return true;
1374        default:
1375                BUG();
1376        }
1377}
1378
1379static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1380{
1381        struct rbd_obj_request *obj_req = osd_req->r_priv;
1382        int result;
1383
1384        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1385             osd_req->r_result, obj_req);
1386
1387        /*
1388         * Writes aren't allowed to return a data payload.  In some
1389         * guarded write cases (e.g. stat + zero on an empty object)
1390         * a stat response makes it through, but we don't care.
1391         */
1392        if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1393                result = 0;
1394        else
1395                result = osd_req->r_result;
1396
1397        rbd_obj_handle_request(obj_req, result);
1398}
1399
1400static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1401{
1402        struct rbd_obj_request *obj_request = osd_req->r_priv;
1403        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1404        struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1405
1406        osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1407        osd_req->r_snapid = obj_request->img_request->snap_id;
1408}
1409
1410static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1411{
1412        struct rbd_obj_request *obj_request = osd_req->r_priv;
1413
1414        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1415        ktime_get_real_ts64(&osd_req->r_mtime);
1416        osd_req->r_data_offset = obj_request->ex.oe_off;
1417}
1418
1419static struct ceph_osd_request *
1420__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1421                          struct ceph_snap_context *snapc, int num_ops)
1422{
1423        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1424        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1425        struct ceph_osd_request *req;
1426        const char *name_format = rbd_dev->image_format == 1 ?
1427                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1428        int ret;
1429
1430        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1431        if (!req)
1432                return ERR_PTR(-ENOMEM);
1433
1434        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1435        req->r_callback = rbd_osd_req_callback;
1436        req->r_priv = obj_req;
1437
1438        /*
1439         * Data objects may be stored in a separate pool, but always in
1440         * the same namespace in that pool as the header in its pool.
1441         */
1442        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1443        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1444
1445        ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1446                               rbd_dev->header.object_prefix,
1447                               obj_req->ex.oe_objno);
1448        if (ret)
1449                return ERR_PTR(ret);
1450
1451        return req;
1452}
1453
1454static struct ceph_osd_request *
1455rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1456{
1457        return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1458                                         num_ops);
1459}
1460
1461static struct rbd_obj_request *rbd_obj_request_create(void)
1462{
1463        struct rbd_obj_request *obj_request;
1464
1465        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1466        if (!obj_request)
1467                return NULL;
1468
1469        ceph_object_extent_init(&obj_request->ex);
1470        INIT_LIST_HEAD(&obj_request->osd_reqs);
1471        mutex_init(&obj_request->state_mutex);
1472        kref_init(&obj_request->kref);
1473
1474        dout("%s %p\n", __func__, obj_request);
1475        return obj_request;
1476}
1477
1478static void rbd_obj_request_destroy(struct kref *kref)
1479{
1480        struct rbd_obj_request *obj_request;
1481        struct ceph_osd_request *osd_req;
1482        u32 i;
1483
1484        obj_request = container_of(kref, struct rbd_obj_request, kref);
1485
1486        dout("%s: obj %p\n", __func__, obj_request);
1487
1488        while (!list_empty(&obj_request->osd_reqs)) {
1489                osd_req = list_first_entry(&obj_request->osd_reqs,
1490                                    struct ceph_osd_request, r_private_item);
1491                list_del_init(&osd_req->r_private_item);
1492                ceph_osdc_put_request(osd_req);
1493        }
1494
1495        switch (obj_request->img_request->data_type) {
1496        case OBJ_REQUEST_NODATA:
1497        case OBJ_REQUEST_BIO:
1498        case OBJ_REQUEST_BVECS:
1499                break;          /* Nothing to do */
1500        case OBJ_REQUEST_OWN_BVECS:
1501                kfree(obj_request->bvec_pos.bvecs);
1502                break;
1503        default:
1504                BUG();
1505        }
1506
1507        kfree(obj_request->img_extents);
1508        if (obj_request->copyup_bvecs) {
1509                for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1510                        if (obj_request->copyup_bvecs[i].bv_page)
1511                                __free_page(obj_request->copyup_bvecs[i].bv_page);
1512                }
1513                kfree(obj_request->copyup_bvecs);
1514        }
1515
1516        kmem_cache_free(rbd_obj_request_cache, obj_request);
1517}
1518
1519/* It's OK to call this for a device with no parent */
1520
1521static void rbd_spec_put(struct rbd_spec *spec);
1522static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1523{
1524        rbd_dev_remove_parent(rbd_dev);
1525        rbd_spec_put(rbd_dev->parent_spec);
1526        rbd_dev->parent_spec = NULL;
1527        rbd_dev->parent_overlap = 0;
1528}
1529
1530/*
1531 * Parent image reference counting is used to determine when an
1532 * image's parent fields can be safely torn down--after there are no
1533 * more in-flight requests to the parent image.  When the last
1534 * reference is dropped, cleaning them up is safe.
1535 */
1536static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1537{
1538        int counter;
1539
1540        if (!rbd_dev->parent_spec)
1541                return;
1542
1543        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1544        if (counter > 0)
1545                return;
1546
1547        /* Last reference; clean up parent data structures */
1548
1549        if (!counter)
1550                rbd_dev_unparent(rbd_dev);
1551        else
1552                rbd_warn(rbd_dev, "parent reference underflow");
1553}
1554
1555/*
1556 * If an image has a non-zero parent overlap, get a reference to its
1557 * parent.
1558 *
1559 * Returns true if the rbd device has a parent with a non-zero
1560 * overlap and a reference for it was successfully taken, or
1561 * false otherwise.
1562 */
1563static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1564{
1565        int counter = 0;
1566
1567        if (!rbd_dev->parent_spec)
1568                return false;
1569
1570        if (rbd_dev->parent_overlap)
1571                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1572
1573        if (counter < 0)
1574                rbd_warn(rbd_dev, "parent reference overflow");
1575
1576        return counter > 0;
1577}
1578
1579static void rbd_img_request_init(struct rbd_img_request *img_request,
1580                                 struct rbd_device *rbd_dev,
1581                                 enum obj_operation_type op_type)
1582{
1583        memset(img_request, 0, sizeof(*img_request));
1584
1585        img_request->rbd_dev = rbd_dev;
1586        img_request->op_type = op_type;
1587
1588        INIT_LIST_HEAD(&img_request->lock_item);
1589        INIT_LIST_HEAD(&img_request->object_extents);
1590        mutex_init(&img_request->state_mutex);
1591}
1592
1593static void rbd_img_capture_header(struct rbd_img_request *img_req)
1594{
1595        struct rbd_device *rbd_dev = img_req->rbd_dev;
1596
1597        lockdep_assert_held(&rbd_dev->header_rwsem);
1598
1599        if (rbd_img_is_write(img_req))
1600                img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1601        else
1602                img_req->snap_id = rbd_dev->spec->snap_id;
1603
1604        if (rbd_dev_parent_get(rbd_dev))
1605                img_request_layered_set(img_req);
1606}
1607
1608static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1609{
1610        struct rbd_obj_request *obj_request;
1611        struct rbd_obj_request *next_obj_request;
1612
1613        dout("%s: img %p\n", __func__, img_request);
1614
1615        WARN_ON(!list_empty(&img_request->lock_item));
1616        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1617                rbd_img_obj_request_del(img_request, obj_request);
1618
1619        if (img_request_layered_test(img_request))
1620                rbd_dev_parent_put(img_request->rbd_dev);
1621
1622        if (rbd_img_is_write(img_request))
1623                ceph_put_snap_context(img_request->snapc);
1624
1625        if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1626                kmem_cache_free(rbd_img_request_cache, img_request);
1627}
1628
1629#define BITS_PER_OBJ    2
1630#define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1631#define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1632
1633static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1634                                   u64 *index, u8 *shift)
1635{
1636        u32 off;
1637
1638        rbd_assert(objno < rbd_dev->object_map_size);
1639        *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1640        *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1641}
1642
1643static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1644{
1645        u64 index;
1646        u8 shift;
1647
1648        lockdep_assert_held(&rbd_dev->object_map_lock);
1649        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1650        return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1651}
1652
1653static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1654{
1655        u64 index;
1656        u8 shift;
1657        u8 *p;
1658
1659        lockdep_assert_held(&rbd_dev->object_map_lock);
1660        rbd_assert(!(val & ~OBJ_MASK));
1661
1662        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1663        p = &rbd_dev->object_map[index];
1664        *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1665}
1666
1667static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1668{
1669        u8 state;
1670
1671        spin_lock(&rbd_dev->object_map_lock);
1672        state = __rbd_object_map_get(rbd_dev, objno);
1673        spin_unlock(&rbd_dev->object_map_lock);
1674        return state;
1675}
1676
1677static bool use_object_map(struct rbd_device *rbd_dev)
1678{
1679        /*
1680         * An image mapped read-only can't use the object map -- it isn't
1681         * loaded because the header lock isn't acquired.  Someone else can
1682         * write to the image and update the object map behind our back.
1683         *
1684         * A snapshot can't be written to, so using the object map is always
1685         * safe.
1686         */
1687        if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1688                return false;
1689
1690        return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1691                !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1692}
1693
1694static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1695{
1696        u8 state;
1697
1698        /* fall back to default logic if object map is disabled or invalid */
1699        if (!use_object_map(rbd_dev))
1700                return true;
1701
1702        state = rbd_object_map_get(rbd_dev, objno);
1703        return state != OBJECT_NONEXISTENT;
1704}
1705
1706static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1707                                struct ceph_object_id *oid)
1708{
1709        if (snap_id == CEPH_NOSNAP)
1710                ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1711                                rbd_dev->spec->image_id);
1712        else
1713                ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1714                                rbd_dev->spec->image_id, snap_id);
1715}
1716
1717static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1718{
1719        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1720        CEPH_DEFINE_OID_ONSTACK(oid);
1721        u8 lock_type;
1722        char *lock_tag;
1723        struct ceph_locker *lockers;
1724        u32 num_lockers;
1725        bool broke_lock = false;
1726        int ret;
1727
1728        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1729
1730again:
1731        ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1732                            CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1733        if (ret != -EBUSY || broke_lock) {
1734                if (ret == -EEXIST)
1735                        ret = 0; /* already locked by myself */
1736                if (ret)
1737                        rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1738                return ret;
1739        }
1740
1741        ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1742                                 RBD_LOCK_NAME, &lock_type, &lock_tag,
1743                                 &lockers, &num_lockers);
1744        if (ret) {
1745                if (ret == -ENOENT)
1746                        goto again;
1747
1748                rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1749                return ret;
1750        }
1751
1752        kfree(lock_tag);
1753        if (num_lockers == 0)
1754                goto again;
1755
1756        rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1757                 ENTITY_NAME(lockers[0].id.name));
1758
1759        ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1760                                  RBD_LOCK_NAME, lockers[0].id.cookie,
1761                                  &lockers[0].id.name);
1762        ceph_free_lockers(lockers, num_lockers);
1763        if (ret) {
1764                if (ret == -ENOENT)
1765                        goto again;
1766
1767                rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1768                return ret;
1769        }
1770
1771        broke_lock = true;
1772        goto again;
1773}
1774
1775static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1776{
1777        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1778        CEPH_DEFINE_OID_ONSTACK(oid);
1779        int ret;
1780
1781        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1782
1783        ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1784                              "");
1785        if (ret && ret != -ENOENT)
1786                rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1787}
1788
1789static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1790{
1791        u8 struct_v;
1792        u32 struct_len;
1793        u32 header_len;
1794        void *header_end;
1795        int ret;
1796
1797        ceph_decode_32_safe(p, end, header_len, e_inval);
1798        header_end = *p + header_len;
1799
1800        ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1801                                  &struct_len);
1802        if (ret)
1803                return ret;
1804
1805        ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1806
1807        *p = header_end;
1808        return 0;
1809
1810e_inval:
1811        return -EINVAL;
1812}
1813
1814static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1815{
1816        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1817        CEPH_DEFINE_OID_ONSTACK(oid);
1818        struct page **pages;
1819        void *p, *end;
1820        size_t reply_len;
1821        u64 num_objects;
1822        u64 object_map_bytes;
1823        u64 object_map_size;
1824        int num_pages;
1825        int ret;
1826
1827        rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1828
1829        num_objects = ceph_get_num_objects(&rbd_dev->layout,
1830                                           rbd_dev->mapping.size);
1831        object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1832                                            BITS_PER_BYTE);
1833        num_pages = calc_pages_for(0, object_map_bytes) + 1;
1834        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1835        if (IS_ERR(pages))
1836                return PTR_ERR(pages);
1837
1838        reply_len = num_pages * PAGE_SIZE;
1839        rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1840        ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1841                             "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1842                             NULL, 0, pages, &reply_len);
1843        if (ret)
1844                goto out;
1845
1846        p = page_address(pages[0]);
1847        end = p + min(reply_len, (size_t)PAGE_SIZE);
1848        ret = decode_object_map_header(&p, end, &object_map_size);
1849        if (ret)
1850                goto out;
1851
1852        if (object_map_size != num_objects) {
1853                rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1854                         object_map_size, num_objects);
1855                ret = -EINVAL;
1856                goto out;
1857        }
1858
1859        if (offset_in_page(p) + object_map_bytes > reply_len) {
1860                ret = -EINVAL;
1861                goto out;
1862        }
1863
1864        rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1865        if (!rbd_dev->object_map) {
1866                ret = -ENOMEM;
1867                goto out;
1868        }
1869
1870        rbd_dev->object_map_size = object_map_size;
1871        ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1872                                   offset_in_page(p), object_map_bytes);
1873
1874out:
1875        ceph_release_page_vector(pages, num_pages);
1876        return ret;
1877}
1878
1879static void rbd_object_map_free(struct rbd_device *rbd_dev)
1880{
1881        kvfree(rbd_dev->object_map);
1882        rbd_dev->object_map = NULL;
1883        rbd_dev->object_map_size = 0;
1884}
1885
1886static int rbd_object_map_load(struct rbd_device *rbd_dev)
1887{
1888        int ret;
1889
1890        ret = __rbd_object_map_load(rbd_dev);
1891        if (ret)
1892                return ret;
1893
1894        ret = rbd_dev_v2_get_flags(rbd_dev);
1895        if (ret) {
1896                rbd_object_map_free(rbd_dev);
1897                return ret;
1898        }
1899
1900        if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1901                rbd_warn(rbd_dev, "object map is invalid");
1902
1903        return 0;
1904}
1905
1906static int rbd_object_map_open(struct rbd_device *rbd_dev)
1907{
1908        int ret;
1909
1910        ret = rbd_object_map_lock(rbd_dev);
1911        if (ret)
1912                return ret;
1913
1914        ret = rbd_object_map_load(rbd_dev);
1915        if (ret) {
1916                rbd_object_map_unlock(rbd_dev);
1917                return ret;
1918        }
1919
1920        return 0;
1921}
1922
1923static void rbd_object_map_close(struct rbd_device *rbd_dev)
1924{
1925        rbd_object_map_free(rbd_dev);
1926        rbd_object_map_unlock(rbd_dev);
1927}
1928
1929/*
1930 * This function needs snap_id (or more precisely just something to
1931 * distinguish between HEAD and snapshot object maps), new_state and
1932 * current_state that were passed to rbd_object_map_update().
1933 *
1934 * To avoid allocating and stashing a context we piggyback on the OSD
1935 * request.  A HEAD update has two ops (assert_locked).  For new_state
1936 * and current_state we decode our own object_map_update op, encoded in
1937 * rbd_cls_object_map_update().
1938 */
1939static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1940                                        struct ceph_osd_request *osd_req)
1941{
1942        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1943        struct ceph_osd_data *osd_data;
1944        u64 objno;
1945        u8 state, new_state, current_state;
1946        bool has_current_state;
1947        void *p;
1948
1949        if (osd_req->r_result)
1950                return osd_req->r_result;
1951
1952        /*
1953         * Nothing to do for a snapshot object map.
1954         */
1955        if (osd_req->r_num_ops == 1)
1956                return 0;
1957
1958        /*
1959         * Update in-memory HEAD object map.
1960         */
1961        rbd_assert(osd_req->r_num_ops == 2);
1962        osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1963        rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1964
1965        p = page_address(osd_data->pages[0]);
1966        objno = ceph_decode_64(&p);
1967        rbd_assert(objno == obj_req->ex.oe_objno);
1968        rbd_assert(ceph_decode_64(&p) == objno + 1);
1969        new_state = ceph_decode_8(&p);
1970        has_current_state = ceph_decode_8(&p);
1971        if (has_current_state)
1972                current_state = ceph_decode_8(&p);
1973
1974        spin_lock(&rbd_dev->object_map_lock);
1975        state = __rbd_object_map_get(rbd_dev, objno);
1976        if (!has_current_state || current_state == state ||
1977            (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1978                __rbd_object_map_set(rbd_dev, objno, new_state);
1979        spin_unlock(&rbd_dev->object_map_lock);
1980
1981        return 0;
1982}
1983
1984static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1985{
1986        struct rbd_obj_request *obj_req = osd_req->r_priv;
1987        int result;
1988
1989        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1990             osd_req->r_result, obj_req);
1991
1992        result = rbd_object_map_update_finish(obj_req, osd_req);
1993        rbd_obj_handle_request(obj_req, result);
1994}
1995
1996static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
1997{
1998        u8 state = rbd_object_map_get(rbd_dev, objno);
1999
2000        if (state == new_state ||
2001            (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2002            (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2003                return false;
2004
2005        return true;
2006}
2007
2008static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2009                                     int which, u64 objno, u8 new_state,
2010                                     const u8 *current_state)
2011{
2012        struct page **pages;
2013        void *p, *start;
2014        int ret;
2015
2016        ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2017        if (ret)
2018                return ret;
2019
2020        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2021        if (IS_ERR(pages))
2022                return PTR_ERR(pages);
2023
2024        p = start = page_address(pages[0]);
2025        ceph_encode_64(&p, objno);
2026        ceph_encode_64(&p, objno + 1);
2027        ceph_encode_8(&p, new_state);
2028        if (current_state) {
2029                ceph_encode_8(&p, 1);
2030                ceph_encode_8(&p, *current_state);
2031        } else {
2032                ceph_encode_8(&p, 0);
2033        }
2034
2035        osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2036                                          false, true);
2037        return 0;
2038}
2039
2040/*
2041 * Return:
2042 *   0 - object map update sent
2043 *   1 - object map update isn't needed
2044 *  <0 - error
2045 */
2046static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2047                                 u8 new_state, const u8 *current_state)
2048{
2049        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2050        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2051        struct ceph_osd_request *req;
2052        int num_ops = 1;
2053        int which = 0;
2054        int ret;
2055
2056        if (snap_id == CEPH_NOSNAP) {
2057                if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2058                        return 1;
2059
2060                num_ops++; /* assert_locked */
2061        }
2062
2063        req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2064        if (!req)
2065                return -ENOMEM;
2066
2067        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2068        req->r_callback = rbd_object_map_callback;
2069        req->r_priv = obj_req;
2070
2071        rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2072        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2073        req->r_flags = CEPH_OSD_FLAG_WRITE;
2074        ktime_get_real_ts64(&req->r_mtime);
2075
2076        if (snap_id == CEPH_NOSNAP) {
2077                /*
2078                 * Protect against possible race conditions during lock
2079                 * ownership transitions.
2080                 */
2081                ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2082                                             CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2083                if (ret)
2084                        return ret;
2085        }
2086
2087        ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2088                                        new_state, current_state);
2089        if (ret)
2090                return ret;
2091
2092        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2093        if (ret)
2094                return ret;
2095
2096        ceph_osdc_start_request(osdc, req, false);
2097        return 0;
2098}
2099
2100static void prune_extents(struct ceph_file_extent *img_extents,
2101                          u32 *num_img_extents, u64 overlap)
2102{
2103        u32 cnt = *num_img_extents;
2104
2105        /* drop extents completely beyond the overlap */
2106        while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2107                cnt--;
2108
2109        if (cnt) {
2110                struct ceph_file_extent *ex = &img_extents[cnt - 1];
2111
2112                /* trim final overlapping extent */
2113                if (ex->fe_off + ex->fe_len > overlap)
2114                        ex->fe_len = overlap - ex->fe_off;
2115        }
2116
2117        *num_img_extents = cnt;
2118}
2119
2120/*
2121 * Determine the byte range(s) covered by either just the object extent
2122 * or the entire object in the parent image.
2123 */
2124static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2125                                    bool entire)
2126{
2127        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2128        int ret;
2129
2130        if (!rbd_dev->parent_overlap)
2131                return 0;
2132
2133        ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2134                                  entire ? 0 : obj_req->ex.oe_off,
2135                                  entire ? rbd_dev->layout.object_size :
2136                                                        obj_req->ex.oe_len,
2137                                  &obj_req->img_extents,
2138                                  &obj_req->num_img_extents);
2139        if (ret)
2140                return ret;
2141
2142        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2143                      rbd_dev->parent_overlap);
2144        return 0;
2145}
2146
2147static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2148{
2149        struct rbd_obj_request *obj_req = osd_req->r_priv;
2150
2151        switch (obj_req->img_request->data_type) {
2152        case OBJ_REQUEST_BIO:
2153                osd_req_op_extent_osd_data_bio(osd_req, which,
2154                                               &obj_req->bio_pos,
2155                                               obj_req->ex.oe_len);
2156                break;
2157        case OBJ_REQUEST_BVECS:
2158        case OBJ_REQUEST_OWN_BVECS:
2159                rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2160                                                        obj_req->ex.oe_len);
2161                rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2162                osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2163                                                    &obj_req->bvec_pos);
2164                break;
2165        default:
2166                BUG();
2167        }
2168}
2169
2170static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2171{
2172        struct page **pages;
2173
2174        /*
2175         * The response data for a STAT call consists of:
2176         *     le64 length;
2177         *     struct {
2178         *         le32 tv_sec;
2179         *         le32 tv_nsec;
2180         *     } mtime;
2181         */
2182        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2183        if (IS_ERR(pages))
2184                return PTR_ERR(pages);
2185
2186        osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2187        osd_req_op_raw_data_in_pages(osd_req, which, pages,
2188                                     8 + sizeof(struct ceph_timespec),
2189                                     0, false, true);
2190        return 0;
2191}
2192
2193static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2194                                u32 bytes)
2195{
2196        struct rbd_obj_request *obj_req = osd_req->r_priv;
2197        int ret;
2198
2199        ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2200        if (ret)
2201                return ret;
2202
2203        osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2204                                          obj_req->copyup_bvec_count, bytes);
2205        return 0;
2206}
2207
2208static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2209{
2210        obj_req->read_state = RBD_OBJ_READ_START;
2211        return 0;
2212}
2213
2214static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2215                                      int which)
2216{
2217        struct rbd_obj_request *obj_req = osd_req->r_priv;
2218        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2219        u16 opcode;
2220
2221        if (!use_object_map(rbd_dev) ||
2222            !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2223                osd_req_op_alloc_hint_init(osd_req, which++,
2224                                           rbd_dev->layout.object_size,
2225                                           rbd_dev->layout.object_size,
2226                                           rbd_dev->opts->alloc_hint_flags);
2227        }
2228
2229        if (rbd_obj_is_entire(obj_req))
2230                opcode = CEPH_OSD_OP_WRITEFULL;
2231        else
2232                opcode = CEPH_OSD_OP_WRITE;
2233
2234        osd_req_op_extent_init(osd_req, which, opcode,
2235                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2236        rbd_osd_setup_data(osd_req, which);
2237}
2238
2239static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2240{
2241        int ret;
2242
2243        /* reverse map the entire object onto the parent */
2244        ret = rbd_obj_calc_img_extents(obj_req, true);
2245        if (ret)
2246                return ret;
2247
2248        if (rbd_obj_copyup_enabled(obj_req))
2249                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2250
2251        obj_req->write_state = RBD_OBJ_WRITE_START;
2252        return 0;
2253}
2254
2255static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2256{
2257        return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2258                                          CEPH_OSD_OP_ZERO;
2259}
2260
2261static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2262                                        int which)
2263{
2264        struct rbd_obj_request *obj_req = osd_req->r_priv;
2265
2266        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2267                rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2268                osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2269        } else {
2270                osd_req_op_extent_init(osd_req, which,
2271                                       truncate_or_zero_opcode(obj_req),
2272                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2273                                       0, 0);
2274        }
2275}
2276
2277static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2278{
2279        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2280        u64 off, next_off;
2281        int ret;
2282
2283        /*
2284         * Align the range to alloc_size boundary and punt on discards
2285         * that are too small to free up any space.
2286         *
2287         * alloc_size == object_size && is_tail() is a special case for
2288         * filestore with filestore_punch_hole = false, needed to allow
2289         * truncate (in addition to delete).
2290         */
2291        if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2292            !rbd_obj_is_tail(obj_req)) {
2293                off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2294                next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2295                                      rbd_dev->opts->alloc_size);
2296                if (off >= next_off)
2297                        return 1;
2298
2299                dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2300                     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2301                     off, next_off - off);
2302                obj_req->ex.oe_off = off;
2303                obj_req->ex.oe_len = next_off - off;
2304        }
2305
2306        /* reverse map the entire object onto the parent */
2307        ret = rbd_obj_calc_img_extents(obj_req, true);
2308        if (ret)
2309                return ret;
2310
2311        obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2312        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2313                obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2314
2315        obj_req->write_state = RBD_OBJ_WRITE_START;
2316        return 0;
2317}
2318
2319static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2320                                        int which)
2321{
2322        struct rbd_obj_request *obj_req = osd_req->r_priv;
2323        u16 opcode;
2324
2325        if (rbd_obj_is_entire(obj_req)) {
2326                if (obj_req->num_img_extents) {
2327                        if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2328                                osd_req_op_init(osd_req, which++,
2329                                                CEPH_OSD_OP_CREATE, 0);
2330                        opcode = CEPH_OSD_OP_TRUNCATE;
2331                } else {
2332                        rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2333                        osd_req_op_init(osd_req, which++,
2334                                        CEPH_OSD_OP_DELETE, 0);
2335                        opcode = 0;
2336                }
2337        } else {
2338                opcode = truncate_or_zero_opcode(obj_req);
2339        }
2340
2341        if (opcode)
2342                osd_req_op_extent_init(osd_req, which, opcode,
2343                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2344                                       0, 0);
2345}
2346
2347static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2348{
2349        int ret;
2350
2351        /* reverse map the entire object onto the parent */
2352        ret = rbd_obj_calc_img_extents(obj_req, true);
2353        if (ret)
2354                return ret;
2355
2356        if (rbd_obj_copyup_enabled(obj_req))
2357                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2358        if (!obj_req->num_img_extents) {
2359                obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2360                if (rbd_obj_is_entire(obj_req))
2361                        obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2362        }
2363
2364        obj_req->write_state = RBD_OBJ_WRITE_START;
2365        return 0;
2366}
2367
2368static int count_write_ops(struct rbd_obj_request *obj_req)
2369{
2370        struct rbd_img_request *img_req = obj_req->img_request;
2371
2372        switch (img_req->op_type) {
2373        case OBJ_OP_WRITE:
2374                if (!use_object_map(img_req->rbd_dev) ||
2375                    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2376                        return 2; /* setallochint + write/writefull */
2377
2378                return 1; /* write/writefull */
2379        case OBJ_OP_DISCARD:
2380                return 1; /* delete/truncate/zero */
2381        case OBJ_OP_ZEROOUT:
2382                if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2383                    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2384                        return 2; /* create + truncate */
2385
2386                return 1; /* delete/truncate/zero */
2387        default:
2388                BUG();
2389        }
2390}
2391
2392static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2393                                    int which)
2394{
2395        struct rbd_obj_request *obj_req = osd_req->r_priv;
2396
2397        switch (obj_req->img_request->op_type) {
2398        case OBJ_OP_WRITE:
2399                __rbd_osd_setup_write_ops(osd_req, which);
2400                break;
2401        case OBJ_OP_DISCARD:
2402                __rbd_osd_setup_discard_ops(osd_req, which);
2403                break;
2404        case OBJ_OP_ZEROOUT:
2405                __rbd_osd_setup_zeroout_ops(osd_req, which);
2406                break;
2407        default:
2408                BUG();
2409        }
2410}
2411
2412/*
2413 * Prune the list of object requests (adjust offset and/or length, drop
2414 * redundant requests).  Prepare object request state machines and image
2415 * request state machine for execution.
2416 */
2417static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2418{
2419        struct rbd_obj_request *obj_req, *next_obj_req;
2420        int ret;
2421
2422        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2423                switch (img_req->op_type) {
2424                case OBJ_OP_READ:
2425                        ret = rbd_obj_init_read(obj_req);
2426                        break;
2427                case OBJ_OP_WRITE:
2428                        ret = rbd_obj_init_write(obj_req);
2429                        break;
2430                case OBJ_OP_DISCARD:
2431                        ret = rbd_obj_init_discard(obj_req);
2432                        break;
2433                case OBJ_OP_ZEROOUT:
2434                        ret = rbd_obj_init_zeroout(obj_req);
2435                        break;
2436                default:
2437                        BUG();
2438                }
2439                if (ret < 0)
2440                        return ret;
2441                if (ret > 0) {
2442                        rbd_img_obj_request_del(img_req, obj_req);
2443                        continue;
2444                }
2445        }
2446
2447        img_req->state = RBD_IMG_START;
2448        return 0;
2449}
2450
2451union rbd_img_fill_iter {
2452        struct ceph_bio_iter    bio_iter;
2453        struct ceph_bvec_iter   bvec_iter;
2454};
2455
2456struct rbd_img_fill_ctx {
2457        enum obj_request_type   pos_type;
2458        union rbd_img_fill_iter *pos;
2459        union rbd_img_fill_iter iter;
2460        ceph_object_extent_fn_t set_pos_fn;
2461        ceph_object_extent_fn_t count_fn;
2462        ceph_object_extent_fn_t copy_fn;
2463};
2464
2465static struct ceph_object_extent *alloc_object_extent(void *arg)
2466{
2467        struct rbd_img_request *img_req = arg;
2468        struct rbd_obj_request *obj_req;
2469
2470        obj_req = rbd_obj_request_create();
2471        if (!obj_req)
2472                return NULL;
2473
2474        rbd_img_obj_request_add(img_req, obj_req);
2475        return &obj_req->ex;
2476}
2477
2478/*
2479 * While su != os && sc == 1 is technically not fancy (it's the same
2480 * layout as su == os && sc == 1), we can't use the nocopy path for it
2481 * because ->set_pos_fn() should be called only once per object.
2482 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2483 * treat su != os && sc == 1 as fancy.
2484 */
2485static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2486{
2487        return l->stripe_unit != l->object_size;
2488}
2489
2490static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2491                                       struct ceph_file_extent *img_extents,
2492                                       u32 num_img_extents,
2493                                       struct rbd_img_fill_ctx *fctx)
2494{
2495        u32 i;
2496        int ret;
2497
2498        img_req->data_type = fctx->pos_type;
2499
2500        /*
2501         * Create object requests and set each object request's starting
2502         * position in the provided bio (list) or bio_vec array.
2503         */
2504        fctx->iter = *fctx->pos;
2505        for (i = 0; i < num_img_extents; i++) {
2506                ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2507                                           img_extents[i].fe_off,
2508                                           img_extents[i].fe_len,
2509                                           &img_req->object_extents,
2510                                           alloc_object_extent, img_req,
2511                                           fctx->set_pos_fn, &fctx->iter);
2512                if (ret)
2513                        return ret;
2514        }
2515
2516        return __rbd_img_fill_request(img_req);
2517}
2518
2519/*
2520 * Map a list of image extents to a list of object extents, create the
2521 * corresponding object requests (normally each to a different object,
2522 * but not always) and add them to @img_req.  For each object request,
2523 * set up its data descriptor to point to the corresponding chunk(s) of
2524 * @fctx->pos data buffer.
2525 *
2526 * Because ceph_file_to_extents() will merge adjacent object extents
2527 * together, each object request's data descriptor may point to multiple
2528 * different chunks of @fctx->pos data buffer.
2529 *
2530 * @fctx->pos data buffer is assumed to be large enough.
2531 */
2532static int rbd_img_fill_request(struct rbd_img_request *img_req,
2533                                struct ceph_file_extent *img_extents,
2534                                u32 num_img_extents,
2535                                struct rbd_img_fill_ctx *fctx)
2536{
2537        struct rbd_device *rbd_dev = img_req->rbd_dev;
2538        struct rbd_obj_request *obj_req;
2539        u32 i;
2540        int ret;
2541
2542        if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2543            !rbd_layout_is_fancy(&rbd_dev->layout))
2544                return rbd_img_fill_request_nocopy(img_req, img_extents,
2545                                                   num_img_extents, fctx);
2546
2547        img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2548
2549        /*
2550         * Create object requests and determine ->bvec_count for each object
2551         * request.  Note that ->bvec_count sum over all object requests may
2552         * be greater than the number of bio_vecs in the provided bio (list)
2553         * or bio_vec array because when mapped, those bio_vecs can straddle
2554         * stripe unit boundaries.
2555         */
2556        fctx->iter = *fctx->pos;
2557        for (i = 0; i < num_img_extents; i++) {
2558                ret = ceph_file_to_extents(&rbd_dev->layout,
2559                                           img_extents[i].fe_off,
2560                                           img_extents[i].fe_len,
2561                                           &img_req->object_extents,
2562                                           alloc_object_extent, img_req,
2563                                           fctx->count_fn, &fctx->iter);
2564                if (ret)
2565                        return ret;
2566        }
2567
2568        for_each_obj_request(img_req, obj_req) {
2569                obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2570                                              sizeof(*obj_req->bvec_pos.bvecs),
2571                                              GFP_NOIO);
2572                if (!obj_req->bvec_pos.bvecs)
2573                        return -ENOMEM;
2574        }
2575
2576        /*
2577         * Fill in each object request's private bio_vec array, splitting and
2578         * rearranging the provided bio_vecs in stripe unit chunks as needed.
2579         */
2580        fctx->iter = *fctx->pos;
2581        for (i = 0; i < num_img_extents; i++) {
2582                ret = ceph_iterate_extents(&rbd_dev->layout,
2583                                           img_extents[i].fe_off,
2584                                           img_extents[i].fe_len,
2585                                           &img_req->object_extents,
2586                                           fctx->copy_fn, &fctx->iter);
2587                if (ret)
2588                        return ret;
2589        }
2590
2591        return __rbd_img_fill_request(img_req);
2592}
2593
2594static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2595                               u64 off, u64 len)
2596{
2597        struct ceph_file_extent ex = { off, len };
2598        union rbd_img_fill_iter dummy = {};
2599        struct rbd_img_fill_ctx fctx = {
2600                .pos_type = OBJ_REQUEST_NODATA,
2601                .pos = &dummy,
2602        };
2603
2604        return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2605}
2606
2607static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2608{
2609        struct rbd_obj_request *obj_req =
2610            container_of(ex, struct rbd_obj_request, ex);
2611        struct ceph_bio_iter *it = arg;
2612
2613        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2614        obj_req->bio_pos = *it;
2615        ceph_bio_iter_advance(it, bytes);
2616}
2617
2618static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2619{
2620        struct rbd_obj_request *obj_req =
2621            container_of(ex, struct rbd_obj_request, ex);
2622        struct ceph_bio_iter *it = arg;
2623
2624        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2625        ceph_bio_iter_advance_step(it, bytes, ({
2626                obj_req->bvec_count++;
2627        }));
2628
2629}
2630
2631static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2632{
2633        struct rbd_obj_request *obj_req =
2634            container_of(ex, struct rbd_obj_request, ex);
2635        struct ceph_bio_iter *it = arg;
2636
2637        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2638        ceph_bio_iter_advance_step(it, bytes, ({
2639                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2640                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2641        }));
2642}
2643
2644static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2645                                   struct ceph_file_extent *img_extents,
2646                                   u32 num_img_extents,
2647                                   struct ceph_bio_iter *bio_pos)
2648{
2649        struct rbd_img_fill_ctx fctx = {
2650                .pos_type = OBJ_REQUEST_BIO,
2651                .pos = (union rbd_img_fill_iter *)bio_pos,
2652                .set_pos_fn = set_bio_pos,
2653                .count_fn = count_bio_bvecs,
2654                .copy_fn = copy_bio_bvecs,
2655        };
2656
2657        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2658                                    &fctx);
2659}
2660
2661static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2662                                 u64 off, u64 len, struct bio *bio)
2663{
2664        struct ceph_file_extent ex = { off, len };
2665        struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2666
2667        return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2668}
2669
2670static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2671{
2672        struct rbd_obj_request *obj_req =
2673            container_of(ex, struct rbd_obj_request, ex);
2674        struct ceph_bvec_iter *it = arg;
2675
2676        obj_req->bvec_pos = *it;
2677        ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2678        ceph_bvec_iter_advance(it, bytes);
2679}
2680
2681static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2682{
2683        struct rbd_obj_request *obj_req =
2684            container_of(ex, struct rbd_obj_request, ex);
2685        struct ceph_bvec_iter *it = arg;
2686
2687        ceph_bvec_iter_advance_step(it, bytes, ({
2688                obj_req->bvec_count++;
2689        }));
2690}
2691
2692static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2693{
2694        struct rbd_obj_request *obj_req =
2695            container_of(ex, struct rbd_obj_request, ex);
2696        struct ceph_bvec_iter *it = arg;
2697
2698        ceph_bvec_iter_advance_step(it, bytes, ({
2699                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2700                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2701        }));
2702}
2703
2704static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2705                                     struct ceph_file_extent *img_extents,
2706                                     u32 num_img_extents,
2707                                     struct ceph_bvec_iter *bvec_pos)
2708{
2709        struct rbd_img_fill_ctx fctx = {
2710                .pos_type = OBJ_REQUEST_BVECS,
2711                .pos = (union rbd_img_fill_iter *)bvec_pos,
2712                .set_pos_fn = set_bvec_pos,
2713                .count_fn = count_bvecs,
2714                .copy_fn = copy_bvecs,
2715        };
2716
2717        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2718                                    &fctx);
2719}
2720
2721static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2722                                   struct ceph_file_extent *img_extents,
2723                                   u32 num_img_extents,
2724                                   struct bio_vec *bvecs)
2725{
2726        struct ceph_bvec_iter it = {
2727                .bvecs = bvecs,
2728                .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2729                                                             num_img_extents) },
2730        };
2731
2732        return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2733                                         &it);
2734}
2735
2736static void rbd_img_handle_request_work(struct work_struct *work)
2737{
2738        struct rbd_img_request *img_req =
2739            container_of(work, struct rbd_img_request, work);
2740
2741        rbd_img_handle_request(img_req, img_req->work_result);
2742}
2743
2744static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2745{
2746        INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2747        img_req->work_result = result;
2748        queue_work(rbd_wq, &img_req->work);
2749}
2750
2751static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2752{
2753        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2754
2755        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2756                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2757                return true;
2758        }
2759
2760        dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2761             obj_req->ex.oe_objno);
2762        return false;
2763}
2764
2765static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2766{
2767        struct ceph_osd_request *osd_req;
2768        int ret;
2769
2770        osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2771        if (IS_ERR(osd_req))
2772                return PTR_ERR(osd_req);
2773
2774        osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2775                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2776        rbd_osd_setup_data(osd_req, 0);
2777        rbd_osd_format_read(osd_req);
2778
2779        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2780        if (ret)
2781                return ret;
2782
2783        rbd_osd_submit(osd_req);
2784        return 0;
2785}
2786
2787static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2788{
2789        struct rbd_img_request *img_req = obj_req->img_request;
2790        struct rbd_device *parent = img_req->rbd_dev->parent;
2791        struct rbd_img_request *child_img_req;
2792        int ret;
2793
2794        child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2795        if (!child_img_req)
2796                return -ENOMEM;
2797
2798        rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2799        __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2800        child_img_req->obj_request = obj_req;
2801
2802        down_read(&parent->header_rwsem);
2803        rbd_img_capture_header(child_img_req);
2804        up_read(&parent->header_rwsem);
2805
2806        dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2807             obj_req);
2808
2809        if (!rbd_img_is_write(img_req)) {
2810                switch (img_req->data_type) {
2811                case OBJ_REQUEST_BIO:
2812                        ret = __rbd_img_fill_from_bio(child_img_req,
2813                                                      obj_req->img_extents,
2814                                                      obj_req->num_img_extents,
2815                                                      &obj_req->bio_pos);
2816                        break;
2817                case OBJ_REQUEST_BVECS:
2818                case OBJ_REQUEST_OWN_BVECS:
2819                        ret = __rbd_img_fill_from_bvecs(child_img_req,
2820                                                      obj_req->img_extents,
2821                                                      obj_req->num_img_extents,
2822                                                      &obj_req->bvec_pos);
2823                        break;
2824                default:
2825                        BUG();
2826                }
2827        } else {
2828                ret = rbd_img_fill_from_bvecs(child_img_req,
2829                                              obj_req->img_extents,
2830                                              obj_req->num_img_extents,
2831                                              obj_req->copyup_bvecs);
2832        }
2833        if (ret) {
2834                rbd_img_request_destroy(child_img_req);
2835                return ret;
2836        }
2837
2838        /* avoid parent chain recursion */
2839        rbd_img_schedule(child_img_req, 0);
2840        return 0;
2841}
2842
2843static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2844{
2845        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2846        int ret;
2847
2848again:
2849        switch (obj_req->read_state) {
2850        case RBD_OBJ_READ_START:
2851                rbd_assert(!*result);
2852
2853                if (!rbd_obj_may_exist(obj_req)) {
2854                        *result = -ENOENT;
2855                        obj_req->read_state = RBD_OBJ_READ_OBJECT;
2856                        goto again;
2857                }
2858
2859                ret = rbd_obj_read_object(obj_req);
2860                if (ret) {
2861                        *result = ret;
2862                        return true;
2863                }
2864                obj_req->read_state = RBD_OBJ_READ_OBJECT;
2865                return false;
2866        case RBD_OBJ_READ_OBJECT:
2867                if (*result == -ENOENT && rbd_dev->parent_overlap) {
2868                        /* reverse map this object extent onto the parent */
2869                        ret = rbd_obj_calc_img_extents(obj_req, false);
2870                        if (ret) {
2871                                *result = ret;
2872                                return true;
2873                        }
2874                        if (obj_req->num_img_extents) {
2875                                ret = rbd_obj_read_from_parent(obj_req);
2876                                if (ret) {
2877                                        *result = ret;
2878                                        return true;
2879                                }
2880                                obj_req->read_state = RBD_OBJ_READ_PARENT;
2881                                return false;
2882                        }
2883                }
2884
2885                /*
2886                 * -ENOENT means a hole in the image -- zero-fill the entire
2887                 * length of the request.  A short read also implies zero-fill
2888                 * to the end of the request.
2889                 */
2890                if (*result == -ENOENT) {
2891                        rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2892                        *result = 0;
2893                } else if (*result >= 0) {
2894                        if (*result < obj_req->ex.oe_len)
2895                                rbd_obj_zero_range(obj_req, *result,
2896                                                obj_req->ex.oe_len - *result);
2897                        else
2898                                rbd_assert(*result == obj_req->ex.oe_len);
2899                        *result = 0;
2900                }
2901                return true;
2902        case RBD_OBJ_READ_PARENT:
2903                /*
2904                 * The parent image is read only up to the overlap -- zero-fill
2905                 * from the overlap to the end of the request.
2906                 */
2907                if (!*result) {
2908                        u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2909
2910                        if (obj_overlap < obj_req->ex.oe_len)
2911                                rbd_obj_zero_range(obj_req, obj_overlap,
2912                                            obj_req->ex.oe_len - obj_overlap);
2913                }
2914                return true;
2915        default:
2916                BUG();
2917        }
2918}
2919
2920static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2921{
2922        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2923
2924        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2925                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2926
2927        if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2928            (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2929                dout("%s %p noop for nonexistent\n", __func__, obj_req);
2930                return true;
2931        }
2932
2933        return false;
2934}
2935
2936/*
2937 * Return:
2938 *   0 - object map update sent
2939 *   1 - object map update isn't needed
2940 *  <0 - error
2941 */
2942static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2943{
2944        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2945        u8 new_state;
2946
2947        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2948                return 1;
2949
2950        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2951                new_state = OBJECT_PENDING;
2952        else
2953                new_state = OBJECT_EXISTS;
2954
2955        return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2956}
2957
2958static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2959{
2960        struct ceph_osd_request *osd_req;
2961        int num_ops = count_write_ops(obj_req);
2962        int which = 0;
2963        int ret;
2964
2965        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2966                num_ops++; /* stat */
2967
2968        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2969        if (IS_ERR(osd_req))
2970                return PTR_ERR(osd_req);
2971
2972        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2973                ret = rbd_osd_setup_stat(osd_req, which++);
2974                if (ret)
2975                        return ret;
2976        }
2977
2978        rbd_osd_setup_write_ops(osd_req, which);
2979        rbd_osd_format_write(osd_req);
2980
2981        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2982        if (ret)
2983                return ret;
2984
2985        rbd_osd_submit(osd_req);
2986        return 0;
2987}
2988
2989/*
2990 * copyup_bvecs pages are never highmem pages
2991 */
2992static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2993{
2994        struct ceph_bvec_iter it = {
2995                .bvecs = bvecs,
2996                .iter = { .bi_size = bytes },
2997        };
2998
2999        ceph_bvec_iter_advance_step(&it, bytes, ({
3000                if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3001                               bv.bv_len))
3002                        return false;
3003        }));
3004        return true;
3005}
3006
3007#define MODS_ONLY       U32_MAX
3008
3009static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3010                                      u32 bytes)
3011{
3012        struct ceph_osd_request *osd_req;
3013        int ret;
3014
3015        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3016        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3017
3018        osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3019        if (IS_ERR(osd_req))
3020                return PTR_ERR(osd_req);
3021
3022        ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3023        if (ret)
3024                return ret;
3025
3026        rbd_osd_format_write(osd_req);
3027
3028        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3029        if (ret)
3030                return ret;
3031
3032        rbd_osd_submit(osd_req);
3033        return 0;
3034}
3035
3036static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3037                                        u32 bytes)
3038{
3039        struct ceph_osd_request *osd_req;
3040        int num_ops = count_write_ops(obj_req);
3041        int which = 0;
3042        int ret;
3043
3044        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3045
3046        if (bytes != MODS_ONLY)
3047                num_ops++; /* copyup */
3048
3049        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3050        if (IS_ERR(osd_req))
3051                return PTR_ERR(osd_req);
3052
3053        if (bytes != MODS_ONLY) {
3054                ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3055                if (ret)
3056                        return ret;
3057        }
3058
3059        rbd_osd_setup_write_ops(osd_req, which);
3060        rbd_osd_format_write(osd_req);
3061
3062        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3063        if (ret)
3064                return ret;
3065
3066        rbd_osd_submit(osd_req);
3067        return 0;
3068}
3069
3070static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3071{
3072        u32 i;
3073
3074        rbd_assert(!obj_req->copyup_bvecs);
3075        obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3076        obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3077                                        sizeof(*obj_req->copyup_bvecs),
3078                                        GFP_NOIO);
3079        if (!obj_req->copyup_bvecs)
3080                return -ENOMEM;
3081
3082        for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3083                unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3084
3085                obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3086                if (!obj_req->copyup_bvecs[i].bv_page)
3087                        return -ENOMEM;
3088
3089                obj_req->copyup_bvecs[i].bv_offset = 0;
3090                obj_req->copyup_bvecs[i].bv_len = len;
3091                obj_overlap -= len;
3092        }
3093
3094        rbd_assert(!obj_overlap);
3095        return 0;
3096}
3097
3098/*
3099 * The target object doesn't exist.  Read the data for the entire
3100 * target object up to the overlap point (if any) from the parent,
3101 * so we can use it for a copyup.
3102 */
3103static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3104{
3105        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3106        int ret;
3107
3108        rbd_assert(obj_req->num_img_extents);
3109        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3110                      rbd_dev->parent_overlap);
3111        if (!obj_req->num_img_extents) {
3112                /*
3113                 * The overlap has become 0 (most likely because the
3114                 * image has been flattened).  Re-submit the original write
3115                 * request -- pass MODS_ONLY since the copyup isn't needed
3116                 * anymore.
3117                 */
3118                return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3119        }
3120
3121        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3122        if (ret)
3123                return ret;
3124
3125        return rbd_obj_read_from_parent(obj_req);
3126}
3127
3128static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3129{
3130        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3131        struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3132        u8 new_state;
3133        u32 i;
3134        int ret;
3135
3136        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3137
3138        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3139                return;
3140
3141        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3142                return;
3143
3144        for (i = 0; i < snapc->num_snaps; i++) {
3145                if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3146                    i + 1 < snapc->num_snaps)
3147                        new_state = OBJECT_EXISTS_CLEAN;
3148                else
3149                        new_state = OBJECT_EXISTS;
3150
3151                ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3152                                            new_state, NULL);
3153                if (ret < 0) {
3154                        obj_req->pending.result = ret;
3155                        return;
3156                }
3157
3158                rbd_assert(!ret);
3159                obj_req->pending.num_pending++;
3160        }
3161}
3162
3163static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3164{
3165        u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3166        int ret;
3167
3168        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3169
3170        /*
3171         * Only send non-zero copyup data to save some I/O and network
3172         * bandwidth -- zero copyup data is equivalent to the object not
3173         * existing.
3174         */
3175        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3176                bytes = 0;
3177
3178        if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3179                /*
3180                 * Send a copyup request with an empty snapshot context to
3181                 * deep-copyup the object through all existing snapshots.
3182                 * A second request with the current snapshot context will be
3183                 * sent for the actual modification.
3184                 */
3185                ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3186                if (ret) {
3187                        obj_req->pending.result = ret;
3188                        return;
3189                }
3190
3191                obj_req->pending.num_pending++;
3192                bytes = MODS_ONLY;
3193        }
3194
3195        ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3196        if (ret) {
3197                obj_req->pending.result = ret;
3198                return;
3199        }
3200
3201        obj_req->pending.num_pending++;
3202}
3203
3204static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3205{
3206        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3207        int ret;
3208
3209again:
3210        switch (obj_req->copyup_state) {
3211        case RBD_OBJ_COPYUP_START:
3212                rbd_assert(!*result);
3213
3214                ret = rbd_obj_copyup_read_parent(obj_req);
3215                if (ret) {
3216                        *result = ret;
3217                        return true;
3218                }
3219                if (obj_req->num_img_extents)
3220                        obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3221                else
3222                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3223                return false;
3224        case RBD_OBJ_COPYUP_READ_PARENT:
3225                if (*result)
3226                        return true;
3227
3228                if (is_zero_bvecs(obj_req->copyup_bvecs,
3229                                  rbd_obj_img_extents_bytes(obj_req))) {
3230                        dout("%s %p detected zeros\n", __func__, obj_req);
3231                        obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3232                }
3233
3234                rbd_obj_copyup_object_maps(obj_req);
3235                if (!obj_req->pending.num_pending) {
3236                        *result = obj_req->pending.result;
3237                        obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3238                        goto again;
3239                }
3240                obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3241                return false;
3242        case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3243                if (!pending_result_dec(&obj_req->pending, result))
3244                        return false;
3245                fallthrough;
3246        case RBD_OBJ_COPYUP_OBJECT_MAPS:
3247                if (*result) {
3248                        rbd_warn(rbd_dev, "snap object map update failed: %d",
3249                                 *result);
3250                        return true;
3251                }
3252
3253                rbd_obj_copyup_write_object(obj_req);
3254                if (!obj_req->pending.num_pending) {
3255                        *result = obj_req->pending.result;
3256                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3257                        goto again;
3258                }
3259                obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3260                return false;
3261        case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3262                if (!pending_result_dec(&obj_req->pending, result))
3263                        return false;
3264                fallthrough;
3265        case RBD_OBJ_COPYUP_WRITE_OBJECT:
3266                return true;
3267        default:
3268                BUG();
3269        }
3270}
3271
3272/*
3273 * Return:
3274 *   0 - object map update sent
3275 *   1 - object map update isn't needed
3276 *  <0 - error
3277 */
3278static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3279{
3280        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3281        u8 current_state = OBJECT_PENDING;
3282
3283        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3284                return 1;
3285
3286        if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3287                return 1;
3288
3289        return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3290                                     &current_state);
3291}
3292
3293static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3294{
3295        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3296        int ret;
3297
3298again:
3299        switch (obj_req->write_state) {
3300        case RBD_OBJ_WRITE_START:
3301                rbd_assert(!*result);
3302
3303                if (rbd_obj_write_is_noop(obj_req))
3304                        return true;
3305
3306                ret = rbd_obj_write_pre_object_map(obj_req);
3307                if (ret < 0) {
3308                        *result = ret;
3309                        return true;
3310                }
3311                obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3312                if (ret > 0)
3313                        goto again;
3314                return false;
3315        case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3316                if (*result) {
3317                        rbd_warn(rbd_dev, "pre object map update failed: %d",
3318                                 *result);
3319                        return true;
3320                }
3321                ret = rbd_obj_write_object(obj_req);
3322                if (ret) {
3323                        *result = ret;
3324                        return true;
3325                }
3326                obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3327                return false;
3328        case RBD_OBJ_WRITE_OBJECT:
3329                if (*result == -ENOENT) {
3330                        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3331                                *result = 0;
3332                                obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3333                                obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3334                                goto again;
3335                        }
3336                        /*
3337                         * On a non-existent object:
3338                         *   delete - -ENOENT, truncate/zero - 0
3339                         */
3340                        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3341                                *result = 0;
3342                }
3343                if (*result)
3344                        return true;
3345
3346                obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3347                goto again;
3348        case __RBD_OBJ_WRITE_COPYUP:
3349                if (!rbd_obj_advance_copyup(obj_req, result))
3350                        return false;
3351                fallthrough;
3352        case RBD_OBJ_WRITE_COPYUP:
3353                if (*result) {
3354                        rbd_warn(rbd_dev, "copyup failed: %d", *result);
3355                        return true;
3356                }
3357                ret = rbd_obj_write_post_object_map(obj_req);
3358                if (ret < 0) {
3359                        *result = ret;
3360                        return true;
3361                }
3362                obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3363                if (ret > 0)
3364                        goto again;
3365                return false;
3366        case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3367                if (*result)
3368                        rbd_warn(rbd_dev, "post object map update failed: %d",
3369                                 *result);
3370                return true;
3371        default:
3372                BUG();
3373        }
3374}
3375
3376/*
3377 * Return true if @obj_req is completed.
3378 */
3379static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3380                                     int *result)
3381{
3382        struct rbd_img_request *img_req = obj_req->img_request;
3383        struct rbd_device *rbd_dev = img_req->rbd_dev;
3384        bool done;
3385
3386        mutex_lock(&obj_req->state_mutex);
3387        if (!rbd_img_is_write(img_req))
3388                done = rbd_obj_advance_read(obj_req, result);
3389        else
3390                done = rbd_obj_advance_write(obj_req, result);
3391        mutex_unlock(&obj_req->state_mutex);
3392
3393        if (done && *result) {
3394                rbd_assert(*result < 0);
3395                rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3396                         obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3397                         obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3398        }
3399        return done;
3400}
3401
3402/*
3403 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3404 * recursion.
3405 */
3406static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3407{
3408        if (__rbd_obj_handle_request(obj_req, &result))
3409                rbd_img_handle_request(obj_req->img_request, result);
3410}
3411
3412static bool need_exclusive_lock(struct rbd_img_request *img_req)
3413{
3414        struct rbd_device *rbd_dev = img_req->rbd_dev;
3415
3416        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3417                return false;
3418
3419        if (rbd_is_ro(rbd_dev))
3420                return false;
3421
3422        rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3423        if (rbd_dev->opts->lock_on_read ||
3424            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3425                return true;
3426
3427        return rbd_img_is_write(img_req);
3428}
3429
3430static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3431{
3432        struct rbd_device *rbd_dev = img_req->rbd_dev;
3433        bool locked;
3434
3435        lockdep_assert_held(&rbd_dev->lock_rwsem);
3436        locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3437        spin_lock(&rbd_dev->lock_lists_lock);
3438        rbd_assert(list_empty(&img_req->lock_item));
3439        if (!locked)
3440                list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3441        else
3442                list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3443        spin_unlock(&rbd_dev->lock_lists_lock);
3444        return locked;
3445}
3446
3447static void rbd_lock_del_request(struct rbd_img_request *img_req)
3448{
3449        struct rbd_device *rbd_dev = img_req->rbd_dev;
3450        bool need_wakeup;
3451
3452        lockdep_assert_held(&rbd_dev->lock_rwsem);
3453        spin_lock(&rbd_dev->lock_lists_lock);
3454        rbd_assert(!list_empty(&img_req->lock_item));
3455        list_del_init(&img_req->lock_item);
3456        need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3457                       list_empty(&rbd_dev->running_list));
3458        spin_unlock(&rbd_dev->lock_lists_lock);
3459        if (need_wakeup)
3460                complete(&rbd_dev->releasing_wait);
3461}
3462
3463static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3464{
3465        struct rbd_device *rbd_dev = img_req->rbd_dev;
3466
3467        if (!need_exclusive_lock(img_req))
3468                return 1;
3469
3470        if (rbd_lock_add_request(img_req))
3471                return 1;
3472
3473        if (rbd_dev->opts->exclusive) {
3474                WARN_ON(1); /* lock got released? */
3475                return -EROFS;
3476        }
3477
3478        /*
3479         * Note the use of mod_delayed_work() in rbd_acquire_lock()
3480         * and cancel_delayed_work() in wake_lock_waiters().
3481         */
3482        dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3483        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3484        return 0;
3485}
3486
3487static void rbd_img_object_requests(struct rbd_img_request *img_req)
3488{
3489        struct rbd_obj_request *obj_req;
3490
3491        rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3492
3493        for_each_obj_request(img_req, obj_req) {
3494                int result = 0;
3495
3496                if (__rbd_obj_handle_request(obj_req, &result)) {
3497                        if (result) {
3498                                img_req->pending.result = result;
3499                                return;
3500                        }
3501                } else {
3502                        img_req->pending.num_pending++;
3503                }
3504        }
3505}
3506
3507static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3508{
3509        struct rbd_device *rbd_dev = img_req->rbd_dev;
3510        int ret;
3511
3512again:
3513        switch (img_req->state) {
3514        case RBD_IMG_START:
3515                rbd_assert(!*result);
3516
3517                ret = rbd_img_exclusive_lock(img_req);
3518                if (ret < 0) {
3519                        *result = ret;
3520                        return true;
3521                }
3522                img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3523                if (ret > 0)
3524                        goto again;
3525                return false;
3526        case RBD_IMG_EXCLUSIVE_LOCK:
3527                if (*result)
3528                        return true;
3529
3530                rbd_assert(!need_exclusive_lock(img_req) ||
3531                           __rbd_is_lock_owner(rbd_dev));
3532
3533                rbd_img_object_requests(img_req);
3534                if (!img_req->pending.num_pending) {
3535                        *result = img_req->pending.result;
3536                        img_req->state = RBD_IMG_OBJECT_REQUESTS;
3537                        goto again;
3538                }
3539                img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3540                return false;
3541        case __RBD_IMG_OBJECT_REQUESTS:
3542                if (!pending_result_dec(&img_req->pending, result))
3543                        return false;
3544                fallthrough;
3545        case RBD_IMG_OBJECT_REQUESTS:
3546                return true;
3547        default:
3548                BUG();
3549        }
3550}
3551
3552/*
3553 * Return true if @img_req is completed.
3554 */
3555static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3556                                     int *result)
3557{
3558        struct rbd_device *rbd_dev = img_req->rbd_dev;
3559        bool done;
3560
3561        if (need_exclusive_lock(img_req)) {
3562                down_read(&rbd_dev->lock_rwsem);
3563                mutex_lock(&img_req->state_mutex);
3564                done = rbd_img_advance(img_req, result);
3565                if (done)
3566                        rbd_lock_del_request(img_req);
3567                mutex_unlock(&img_req->state_mutex);
3568                up_read(&rbd_dev->lock_rwsem);
3569        } else {
3570                mutex_lock(&img_req->state_mutex);
3571                done = rbd_img_advance(img_req, result);
3572                mutex_unlock(&img_req->state_mutex);
3573        }
3574
3575        if (done && *result) {
3576                rbd_assert(*result < 0);
3577                rbd_warn(rbd_dev, "%s%s result %d",
3578                      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3579                      obj_op_name(img_req->op_type), *result);
3580        }
3581        return done;
3582}
3583
3584static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3585{
3586again:
3587        if (!__rbd_img_handle_request(img_req, &result))
3588                return;
3589
3590        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3591                struct rbd_obj_request *obj_req = img_req->obj_request;
3592
3593                rbd_img_request_destroy(img_req);
3594                if (__rbd_obj_handle_request(obj_req, &result)) {
3595                        img_req = obj_req->img_request;
3596                        goto again;
3597                }
3598        } else {
3599                struct request *rq = blk_mq_rq_from_pdu(img_req);
3600
3601                rbd_img_request_destroy(img_req);
3602                blk_mq_end_request(rq, errno_to_blk_status(result));
3603        }
3604}
3605
3606static const struct rbd_client_id rbd_empty_cid;
3607
3608static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3609                          const struct rbd_client_id *rhs)
3610{
3611        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3612}
3613
3614static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3615{
3616        struct rbd_client_id cid;
3617
3618        mutex_lock(&rbd_dev->watch_mutex);
3619        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3620        cid.handle = rbd_dev->watch_cookie;
3621        mutex_unlock(&rbd_dev->watch_mutex);
3622        return cid;
3623}
3624
3625/*
3626 * lock_rwsem must be held for write
3627 */
3628static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3629                              const struct rbd_client_id *cid)
3630{
3631        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3632             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3633             cid->gid, cid->handle);
3634        rbd_dev->owner_cid = *cid; /* struct */
3635}
3636
3637static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3638{
3639        mutex_lock(&rbd_dev->watch_mutex);
3640        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3641        mutex_unlock(&rbd_dev->watch_mutex);
3642}
3643
3644static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3645{
3646        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3647
3648        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3649        strcpy(rbd_dev->lock_cookie, cookie);
3650        rbd_set_owner_cid(rbd_dev, &cid);
3651        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3652}
3653
3654/*
3655 * lock_rwsem must be held for write
3656 */
3657static int rbd_lock(struct rbd_device *rbd_dev)
3658{
3659        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3660        char cookie[32];
3661        int ret;
3662
3663        WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3664                rbd_dev->lock_cookie[0] != '\0');
3665
3666        format_lock_cookie(rbd_dev, cookie);
3667        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3668                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3669                            RBD_LOCK_TAG, "", 0);
3670        if (ret)
3671                return ret;
3672
3673        __rbd_lock(rbd_dev, cookie);
3674        return 0;
3675}
3676
3677/*
3678 * lock_rwsem must be held for write
3679 */
3680static void rbd_unlock(struct rbd_device *rbd_dev)
3681{
3682        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3683        int ret;
3684
3685        WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3686                rbd_dev->lock_cookie[0] == '\0');
3687
3688        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3689                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
3690        if (ret && ret != -ENOENT)
3691                rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3692
3693        /* treat errors as the image is unlocked */
3694        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3695        rbd_dev->lock_cookie[0] = '\0';
3696        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3697        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3698}
3699
3700static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3701                                enum rbd_notify_op notify_op,
3702                                struct page ***preply_pages,
3703                                size_t *preply_len)
3704{
3705        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3706        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3707        char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3708        int buf_size = sizeof(buf);
3709        void *p = buf;
3710
3711        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3712
3713        /* encode *LockPayload NotifyMessage (op + ClientId) */
3714        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3715        ceph_encode_32(&p, notify_op);
3716        ceph_encode_64(&p, cid.gid);
3717        ceph_encode_64(&p, cid.handle);
3718
3719        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3720                                &rbd_dev->header_oloc, buf, buf_size,
3721                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3722}
3723
3724static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3725                               enum rbd_notify_op notify_op)
3726{
3727        __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3728}
3729
3730static void rbd_notify_acquired_lock(struct work_struct *work)
3731{
3732        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3733                                                  acquired_lock_work);
3734
3735        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3736}
3737
3738static void rbd_notify_released_lock(struct work_struct *work)
3739{
3740        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3741                                                  released_lock_work);
3742
3743        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3744}
3745
3746static int rbd_request_lock(struct rbd_device *rbd_dev)
3747{
3748        struct page **reply_pages;
3749        size_t reply_len;
3750        bool lock_owner_responded = false;
3751        int ret;
3752
3753        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3754
3755        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3756                                   &reply_pages, &reply_len);
3757        if (ret && ret != -ETIMEDOUT) {
3758                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3759                goto out;
3760        }
3761
3762        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3763                void *p = page_address(reply_pages[0]);
3764                void *const end = p + reply_len;
3765                u32 n;
3766
3767                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3768                while (n--) {
3769                        u8 struct_v;
3770                        u32 len;
3771
3772                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3773                        p += 8 + 8; /* skip gid and cookie */
3774
3775                        ceph_decode_32_safe(&p, end, len, e_inval);
3776                        if (!len)
3777                                continue;
3778
3779                        if (lock_owner_responded) {
3780                                rbd_warn(rbd_dev,
3781                                         "duplicate lock owners detected");
3782                                ret = -EIO;
3783                                goto out;
3784                        }
3785
3786                        lock_owner_responded = true;
3787                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3788                                                  &struct_v, &len);
3789                        if (ret) {
3790                                rbd_warn(rbd_dev,
3791                                         "failed to decode ResponseMessage: %d",
3792                                         ret);
3793                                goto e_inval;
3794                        }
3795
3796                        ret = ceph_decode_32(&p);
3797                }
3798        }
3799
3800        if (!lock_owner_responded) {
3801                rbd_warn(rbd_dev, "no lock owners detected");
3802                ret = -ETIMEDOUT;
3803        }
3804
3805out:
3806        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3807        return ret;
3808
3809e_inval:
3810        ret = -EINVAL;
3811        goto out;
3812}
3813
3814/*
3815 * Either image request state machine(s) or rbd_add_acquire_lock()
3816 * (i.e. "rbd map").
3817 */
3818static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3819{
3820        struct rbd_img_request *img_req;
3821
3822        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3823        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3824
3825        cancel_delayed_work(&rbd_dev->lock_dwork);
3826        if (!completion_done(&rbd_dev->acquire_wait)) {
3827                rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3828                           list_empty(&rbd_dev->running_list));
3829                rbd_dev->acquire_err = result;
3830                complete_all(&rbd_dev->acquire_wait);
3831                return;
3832        }
3833
3834        list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3835                mutex_lock(&img_req->state_mutex);
3836                rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3837                rbd_img_schedule(img_req, result);
3838                mutex_unlock(&img_req->state_mutex);
3839        }
3840
3841        list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3842}
3843
3844static int get_lock_owner_info(struct rbd_device *rbd_dev,
3845                               struct ceph_locker **lockers, u32 *num_lockers)
3846{
3847        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3848        u8 lock_type;
3849        char *lock_tag;
3850        int ret;
3851
3852        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3853
3854        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3855                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3856                                 &lock_type, &lock_tag, lockers, num_lockers);
3857        if (ret)
3858                return ret;
3859
3860        if (*num_lockers == 0) {
3861                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3862                goto out;
3863        }
3864
3865        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3866                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3867                         lock_tag);
3868                ret = -EBUSY;
3869                goto out;
3870        }
3871
3872        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3873                rbd_warn(rbd_dev, "shared lock type detected");
3874                ret = -EBUSY;
3875                goto out;
3876        }
3877
3878        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3879                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3880                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3881                         (*lockers)[0].id.cookie);
3882                ret = -EBUSY;
3883                goto out;
3884        }
3885
3886out:
3887        kfree(lock_tag);
3888        return ret;
3889}
3890
3891static int find_watcher(struct rbd_device *rbd_dev,
3892                        const struct ceph_locker *locker)
3893{
3894        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3895        struct ceph_watch_item *watchers;
3896        u32 num_watchers;
3897        u64 cookie;
3898        int i;
3899        int ret;
3900
3901        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3902                                      &rbd_dev->header_oloc, &watchers,
3903                                      &num_watchers);
3904        if (ret)
3905                return ret;
3906
3907        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3908        for (i = 0; i < num_watchers; i++) {
3909                /*
3910                 * Ignore addr->type while comparing.  This mimics
3911                 * entity_addr_t::get_legacy_str() + strcmp().
3912                 */
3913                if (ceph_addr_equal_no_type(&watchers[i].addr,
3914                                            &locker->info.addr) &&
3915                    watchers[i].cookie == cookie) {
3916                        struct rbd_client_id cid = {
3917                                .gid = le64_to_cpu(watchers[i].name.num),
3918                                .handle = cookie,
3919                        };
3920
3921                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3922                             rbd_dev, cid.gid, cid.handle);
3923                        rbd_set_owner_cid(rbd_dev, &cid);
3924                        ret = 1;
3925                        goto out;
3926                }
3927        }
3928
3929        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3930        ret = 0;
3931out:
3932        kfree(watchers);
3933        return ret;
3934}
3935
3936/*
3937 * lock_rwsem must be held for write
3938 */
3939static int rbd_try_lock(struct rbd_device *rbd_dev)
3940{
3941        struct ceph_client *client = rbd_dev->rbd_client->client;
3942        struct ceph_locker *lockers;
3943        u32 num_lockers;
3944        int ret;
3945
3946        for (;;) {
3947                ret = rbd_lock(rbd_dev);
3948                if (ret != -EBUSY)
3949                        return ret;
3950
3951                /* determine if the current lock holder is still alive */
3952                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3953                if (ret)
3954                        return ret;
3955
3956                if (num_lockers == 0)
3957                        goto again;
3958
3959                ret = find_watcher(rbd_dev, lockers);
3960                if (ret)
3961                        goto out; /* request lock or error */
3962
3963                rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3964                         ENTITY_NAME(lockers[0].id.name));
3965
3966                ret = ceph_monc_blocklist_add(&client->monc,
3967                                              &lockers[0].info.addr);
3968                if (ret) {
3969                        rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3970                                 ENTITY_NAME(lockers[0].id.name), ret);
3971                        goto out;
3972                }
3973
3974                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3975                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
3976                                          lockers[0].id.cookie,
3977                                          &lockers[0].id.name);
3978                if (ret && ret != -ENOENT)
3979                        goto out;
3980
3981again:
3982                ceph_free_lockers(lockers, num_lockers);
3983        }
3984
3985out:
3986        ceph_free_lockers(lockers, num_lockers);
3987        return ret;
3988}
3989
3990static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3991{
3992        int ret;
3993
3994        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
3995                ret = rbd_object_map_open(rbd_dev);
3996                if (ret)
3997                        return ret;
3998        }
3999
4000        return 0;
4001}
4002
4003/*
4004 * Return:
4005 *   0 - lock acquired
4006 *   1 - caller should call rbd_request_lock()
4007 *  <0 - error
4008 */
4009static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4010{
4011        int ret;
4012
4013        down_read(&rbd_dev->lock_rwsem);
4014        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4015             rbd_dev->lock_state);
4016        if (__rbd_is_lock_owner(rbd_dev)) {
4017                up_read(&rbd_dev->lock_rwsem);
4018                return 0;
4019        }
4020
4021        up_read(&rbd_dev->lock_rwsem);
4022        down_write(&rbd_dev->lock_rwsem);
4023        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4024             rbd_dev->lock_state);
4025        if (__rbd_is_lock_owner(rbd_dev)) {
4026                up_write(&rbd_dev->lock_rwsem);
4027                return 0;
4028        }
4029
4030        ret = rbd_try_lock(rbd_dev);
4031        if (ret < 0) {
4032                rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4033                if (ret == -EBLOCKLISTED)
4034                        goto out;
4035
4036                ret = 1; /* request lock anyway */
4037        }
4038        if (ret > 0) {
4039                up_write(&rbd_dev->lock_rwsem);
4040                return ret;
4041        }
4042
4043        rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4044        rbd_assert(list_empty(&rbd_dev->running_list));
4045
4046        ret = rbd_post_acquire_action(rbd_dev);
4047        if (ret) {
4048                rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4049                /*
4050                 * Can't stay in RBD_LOCK_STATE_LOCKED because
4051                 * rbd_lock_add_request() would let the request through,
4052                 * assuming that e.g. object map is locked and loaded.
4053                 */
4054                rbd_unlock(rbd_dev);
4055        }
4056
4057out:
4058        wake_lock_waiters(rbd_dev, ret);
4059        up_write(&rbd_dev->lock_rwsem);
4060        return ret;
4061}
4062
4063static void rbd_acquire_lock(struct work_struct *work)
4064{
4065        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4066                                            struct rbd_device, lock_dwork);
4067        int ret;
4068
4069        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4070again:
4071        ret = rbd_try_acquire_lock(rbd_dev);
4072        if (ret <= 0) {
4073                dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4074                return;
4075        }
4076
4077        ret = rbd_request_lock(rbd_dev);
4078        if (ret == -ETIMEDOUT) {
4079                goto again; /* treat this as a dead client */
4080        } else if (ret == -EROFS) {
4081                rbd_warn(rbd_dev, "peer will not release lock");
4082                down_write(&rbd_dev->lock_rwsem);
4083                wake_lock_waiters(rbd_dev, ret);
4084                up_write(&rbd_dev->lock_rwsem);
4085        } else if (ret < 0) {
4086                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4087                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4088                                 RBD_RETRY_DELAY);
4089        } else {
4090                /*
4091                 * lock owner acked, but resend if we don't see them
4092                 * release the lock
4093                 */
4094                dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4095                     rbd_dev);
4096                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4097                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4098        }
4099}
4100
4101static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4102{
4103        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4104        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4105
4106        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4107                return false;
4108
4109        /*
4110         * Ensure that all in-flight IO is flushed.
4111         */
4112        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4113        rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4114        if (list_empty(&rbd_dev->running_list))
4115                return true;
4116
4117        up_write(&rbd_dev->lock_rwsem);
4118        wait_for_completion(&rbd_dev->releasing_wait);
4119
4120        down_write(&rbd_dev->lock_rwsem);
4121        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4122                return false;
4123
4124        rbd_assert(list_empty(&rbd_dev->running_list));
4125        return true;
4126}
4127
4128static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4129{
4130        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4131                rbd_object_map_close(rbd_dev);
4132}
4133
4134static void __rbd_release_lock(struct rbd_device *rbd_dev)
4135{
4136        rbd_assert(list_empty(&rbd_dev->running_list));
4137
4138        rbd_pre_release_action(rbd_dev);
4139        rbd_unlock(rbd_dev);
4140}
4141
4142/*
4143 * lock_rwsem must be held for write
4144 */
4145static void rbd_release_lock(struct rbd_device *rbd_dev)
4146{
4147        if (!rbd_quiesce_lock(rbd_dev))
4148                return;
4149
4150        __rbd_release_lock(rbd_dev);
4151
4152        /*
4153         * Give others a chance to grab the lock - we would re-acquire
4154         * almost immediately if we got new IO while draining the running
4155         * list otherwise.  We need to ack our own notifications, so this
4156         * lock_dwork will be requeued from rbd_handle_released_lock() by
4157         * way of maybe_kick_acquire().
4158         */
4159        cancel_delayed_work(&rbd_dev->lock_dwork);
4160}
4161
4162static void rbd_release_lock_work(struct work_struct *work)
4163{
4164        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4165                                                  unlock_work);
4166
4167        down_write(&rbd_dev->lock_rwsem);
4168        rbd_release_lock(rbd_dev);
4169        up_write(&rbd_dev->lock_rwsem);
4170}
4171
4172static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4173{
4174        bool have_requests;
4175
4176        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4177        if (__rbd_is_lock_owner(rbd_dev))
4178                return;
4179
4180        spin_lock(&rbd_dev->lock_lists_lock);
4181        have_requests = !list_empty(&rbd_dev->acquiring_list);
4182        spin_unlock(&rbd_dev->lock_lists_lock);
4183        if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4184                dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4185                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4186        }
4187}
4188
4189static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4190                                     void **p)
4191{
4192        struct rbd_client_id cid = { 0 };
4193
4194        if (struct_v >= 2) {
4195                cid.gid = ceph_decode_64(p);
4196                cid.handle = ceph_decode_64(p);
4197        }
4198
4199        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4200             cid.handle);
4201        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4202                down_write(&rbd_dev->lock_rwsem);
4203                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4204                        dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4205                             __func__, rbd_dev, cid.gid, cid.handle);
4206                } else {
4207                        rbd_set_owner_cid(rbd_dev, &cid);
4208                }
4209                downgrade_write(&rbd_dev->lock_rwsem);
4210        } else {
4211                down_read(&rbd_dev->lock_rwsem);
4212        }
4213
4214        maybe_kick_acquire(rbd_dev);
4215        up_read(&rbd_dev->lock_rwsem);
4216}
4217
4218static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4219                                     void **p)
4220{
4221        struct rbd_client_id cid = { 0 };
4222
4223        if (struct_v >= 2) {
4224                cid.gid = ceph_decode_64(p);
4225                cid.handle = ceph_decode_64(p);
4226        }
4227
4228        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4229             cid.handle);
4230        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4231                down_write(&rbd_dev->lock_rwsem);
4232                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4233                        dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4234                             __func__, rbd_dev, cid.gid, cid.handle,
4235                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4236                } else {
4237                        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4238                }
4239                downgrade_write(&rbd_dev->lock_rwsem);
4240        } else {
4241                down_read(&rbd_dev->lock_rwsem);
4242        }
4243
4244        maybe_kick_acquire(rbd_dev);
4245        up_read(&rbd_dev->lock_rwsem);
4246}
4247
4248/*
4249 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4250 * ResponseMessage is needed.
4251 */
4252static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4253                                   void **p)
4254{
4255        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4256        struct rbd_client_id cid = { 0 };
4257        int result = 1;
4258
4259        if (struct_v >= 2) {
4260                cid.gid = ceph_decode_64(p);
4261                cid.handle = ceph_decode_64(p);
4262        }
4263
4264        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4265             cid.handle);
4266        if (rbd_cid_equal(&cid, &my_cid))
4267                return result;
4268
4269        down_read(&rbd_dev->lock_rwsem);
4270        if (__rbd_is_lock_owner(rbd_dev)) {
4271                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4272                    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4273                        goto out_unlock;
4274
4275                /*
4276                 * encode ResponseMessage(0) so the peer can detect
4277                 * a missing owner
4278                 */
4279                result = 0;
4280
4281                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4282                        if (!rbd_dev->opts->exclusive) {
4283                                dout("%s rbd_dev %p queueing unlock_work\n",
4284                                     __func__, rbd_dev);
4285                                queue_work(rbd_dev->task_wq,
4286                                           &rbd_dev->unlock_work);
4287                        } else {
4288                                /* refuse to release the lock */
4289                                result = -EROFS;
4290                        }
4291                }
4292        }
4293
4294out_unlock:
4295        up_read(&rbd_dev->lock_rwsem);
4296        return result;
4297}
4298
4299static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4300                                     u64 notify_id, u64 cookie, s32 *result)
4301{
4302        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4303        char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4304        int buf_size = sizeof(buf);
4305        int ret;
4306
4307        if (result) {
4308                void *p = buf;
4309
4310                /* encode ResponseMessage */
4311                ceph_start_encoding(&p, 1, 1,
4312                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
4313                ceph_encode_32(&p, *result);
4314        } else {
4315                buf_size = 0;
4316        }
4317
4318        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4319                                   &rbd_dev->header_oloc, notify_id, cookie,
4320                                   buf, buf_size);
4321        if (ret)
4322                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4323}
4324
4325static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4326                                   u64 cookie)
4327{
4328        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4329        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4330}
4331
4332static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4333                                          u64 notify_id, u64 cookie, s32 result)
4334{
4335        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4336        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4337}
4338
4339static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4340                         u64 notifier_id, void *data, size_t data_len)
4341{
4342        struct rbd_device *rbd_dev = arg;
4343        void *p = data;
4344        void *const end = p + data_len;
4345        u8 struct_v = 0;
4346        u32 len;
4347        u32 notify_op;
4348        int ret;
4349
4350        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4351             __func__, rbd_dev, cookie, notify_id, data_len);
4352        if (data_len) {
4353                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4354                                          &struct_v, &len);
4355                if (ret) {
4356                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4357                                 ret);
4358                        return;
4359                }
4360
4361                notify_op = ceph_decode_32(&p);
4362        } else {
4363                /* legacy notification for header updates */
4364                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4365                len = 0;
4366        }
4367
4368        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4369        switch (notify_op) {
4370        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4371                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4372                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4373                break;
4374        case RBD_NOTIFY_OP_RELEASED_LOCK:
4375                rbd_handle_released_lock(rbd_dev, struct_v, &p);
4376                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4377                break;
4378        case RBD_NOTIFY_OP_REQUEST_LOCK:
4379                ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4380                if (ret <= 0)
4381                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4382                                                      cookie, ret);
4383                else
4384                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4385                break;
4386        case RBD_NOTIFY_OP_HEADER_UPDATE:
4387                ret = rbd_dev_refresh(rbd_dev);
4388                if (ret)
4389                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
4390
4391                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4392                break;
4393        default:
4394                if (rbd_is_lock_owner(rbd_dev))
4395                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4396                                                      cookie, -EOPNOTSUPP);
4397                else
4398                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4399                break;
4400        }
4401}
4402
4403static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4404
4405static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4406{
4407        struct rbd_device *rbd_dev = arg;
4408
4409        rbd_warn(rbd_dev, "encountered watch error: %d", err);
4410
4411        down_write(&rbd_dev->lock_rwsem);
4412        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4413        up_write(&rbd_dev->lock_rwsem);
4414
4415        mutex_lock(&rbd_dev->watch_mutex);
4416        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4417                __rbd_unregister_watch(rbd_dev);
4418                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4419
4420                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4421        }
4422        mutex_unlock(&rbd_dev->watch_mutex);
4423}
4424
4425/*
4426 * watch_mutex must be locked
4427 */
4428static int __rbd_register_watch(struct rbd_device *rbd_dev)
4429{
4430        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4431        struct ceph_osd_linger_request *handle;
4432
4433        rbd_assert(!rbd_dev->watch_handle);
4434        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4435
4436        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4437                                 &rbd_dev->header_oloc, rbd_watch_cb,
4438                                 rbd_watch_errcb, rbd_dev);
4439        if (IS_ERR(handle))
4440                return PTR_ERR(handle);
4441
4442        rbd_dev->watch_handle = handle;
4443        return 0;
4444}
4445
4446/*
4447 * watch_mutex must be locked
4448 */
4449static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4450{
4451        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4452        int ret;
4453
4454        rbd_assert(rbd_dev->watch_handle);
4455        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4456
4457        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4458        if (ret)
4459                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4460
4461        rbd_dev->watch_handle = NULL;
4462}
4463
4464static int rbd_register_watch(struct rbd_device *rbd_dev)
4465{
4466        int ret;
4467
4468        mutex_lock(&rbd_dev->watch_mutex);
4469        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4470        ret = __rbd_register_watch(rbd_dev);
4471        if (ret)
4472                goto out;
4473
4474        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4475        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4476
4477out:
4478        mutex_unlock(&rbd_dev->watch_mutex);
4479        return ret;
4480}
4481
4482static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4483{
4484        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4485
4486        cancel_work_sync(&rbd_dev->acquired_lock_work);
4487        cancel_work_sync(&rbd_dev->released_lock_work);
4488        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4489        cancel_work_sync(&rbd_dev->unlock_work);
4490}
4491
4492/*
4493 * header_rwsem must not be held to avoid a deadlock with
4494 * rbd_dev_refresh() when flushing notifies.
4495 */
4496static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4497{
4498        cancel_tasks_sync(rbd_dev);
4499
4500        mutex_lock(&rbd_dev->watch_mutex);
4501        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4502                __rbd_unregister_watch(rbd_dev);
4503        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4504        mutex_unlock(&rbd_dev->watch_mutex);
4505
4506        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4507        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4508}
4509
4510/*
4511 * lock_rwsem must be held for write
4512 */
4513static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4514{
4515        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4516        char cookie[32];
4517        int ret;
4518
4519        if (!rbd_quiesce_lock(rbd_dev))
4520                return;
4521
4522        format_lock_cookie(rbd_dev, cookie);
4523        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4524                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4525                                  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4526                                  RBD_LOCK_TAG, cookie);
4527        if (ret) {
4528                if (ret != -EOPNOTSUPP)
4529                        rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4530                                 ret);
4531
4532                /*
4533                 * Lock cookie cannot be updated on older OSDs, so do
4534                 * a manual release and queue an acquire.
4535                 */
4536                __rbd_release_lock(rbd_dev);
4537                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4538        } else {
4539                __rbd_lock(rbd_dev, cookie);
4540                wake_lock_waiters(rbd_dev, 0);
4541        }
4542}
4543
4544static void rbd_reregister_watch(struct work_struct *work)
4545{
4546        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4547                                            struct rbd_device, watch_dwork);
4548        int ret;
4549
4550        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4551
4552        mutex_lock(&rbd_dev->watch_mutex);
4553        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4554                mutex_unlock(&rbd_dev->watch_mutex);
4555                return;
4556        }
4557
4558        ret = __rbd_register_watch(rbd_dev);
4559        if (ret) {
4560                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4561                if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4562                        queue_delayed_work(rbd_dev->task_wq,
4563                                           &rbd_dev->watch_dwork,
4564                                           RBD_RETRY_DELAY);
4565                        mutex_unlock(&rbd_dev->watch_mutex);
4566                        return;
4567                }
4568
4569                mutex_unlock(&rbd_dev->watch_mutex);
4570                down_write(&rbd_dev->lock_rwsem);
4571                wake_lock_waiters(rbd_dev, ret);
4572                up_write(&rbd_dev->lock_rwsem);
4573                return;
4574        }
4575
4576        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4577        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4578        mutex_unlock(&rbd_dev->watch_mutex);
4579
4580        down_write(&rbd_dev->lock_rwsem);
4581        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4582                rbd_reacquire_lock(rbd_dev);
4583        up_write(&rbd_dev->lock_rwsem);
4584
4585        ret = rbd_dev_refresh(rbd_dev);
4586        if (ret)
4587                rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4588}
4589
4590/*
4591 * Synchronous osd object method call.  Returns the number of bytes
4592 * returned in the outbound buffer, or a negative error code.
4593 */
4594static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4595                             struct ceph_object_id *oid,
4596                             struct ceph_object_locator *oloc,
4597                             const char *method_name,
4598                             const void *outbound,
4599                             size_t outbound_size,
4600                             void *inbound,
4601                             size_t inbound_size)
4602{
4603        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4604        struct page *req_page = NULL;
4605        struct page *reply_page;
4606        int ret;
4607
4608        /*
4609         * Method calls are ultimately read operations.  The result
4610         * should placed into the inbound buffer provided.  They
4611         * also supply outbound data--parameters for the object
4612         * method.  Currently if this is present it will be a
4613         * snapshot id.
4614         */
4615        if (outbound) {
4616                if (outbound_size > PAGE_SIZE)
4617                        return -E2BIG;
4618
4619                req_page = alloc_page(GFP_KERNEL);
4620                if (!req_page)
4621                        return -ENOMEM;
4622
4623                memcpy(page_address(req_page), outbound, outbound_size);
4624        }
4625
4626        reply_page = alloc_page(GFP_KERNEL);
4627        if (!reply_page) {
4628                if (req_page)
4629                        __free_page(req_page);
4630                return -ENOMEM;
4631        }
4632
4633        ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4634                             CEPH_OSD_FLAG_READ, req_page, outbound_size,
4635                             &reply_page, &inbound_size);
4636        if (!ret) {
4637                memcpy(inbound, page_address(reply_page), inbound_size);
4638                ret = inbound_size;
4639        }
4640
4641        if (req_page)
4642                __free_page(req_page);
4643        __free_page(reply_page);
4644        return ret;
4645}
4646
4647static void rbd_queue_workfn(struct work_struct *work)
4648{
4649        struct rbd_img_request *img_request =
4650            container_of(work, struct rbd_img_request, work);
4651        struct rbd_device *rbd_dev = img_request->rbd_dev;
4652        enum obj_operation_type op_type = img_request->op_type;
4653        struct request *rq = blk_mq_rq_from_pdu(img_request);
4654        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4655        u64 length = blk_rq_bytes(rq);
4656        u64 mapping_size;
4657        int result;
4658
4659        /* Ignore/skip any zero-length requests */
4660        if (!length) {
4661                dout("%s: zero-length request\n", __func__);
4662                result = 0;
4663                goto err_img_request;
4664        }
4665
4666        blk_mq_start_request(rq);
4667
4668        down_read(&rbd_dev->header_rwsem);
4669        mapping_size = rbd_dev->mapping.size;
4670        rbd_img_capture_header(img_request);
4671        up_read(&rbd_dev->header_rwsem);
4672
4673        if (offset + length > mapping_size) {
4674                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4675                         length, mapping_size);
4676                result = -EIO;
4677                goto err_img_request;
4678        }
4679
4680        dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4681             img_request, obj_op_name(op_type), offset, length);
4682
4683        if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4684                result = rbd_img_fill_nodata(img_request, offset, length);
4685        else
4686                result = rbd_img_fill_from_bio(img_request, offset, length,
4687                                               rq->bio);
4688        if (result)
4689                goto err_img_request;
4690
4691        rbd_img_handle_request(img_request, 0);
4692        return;
4693
4694err_img_request:
4695        rbd_img_request_destroy(img_request);
4696        if (result)
4697                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4698                         obj_op_name(op_type), length, offset, result);
4699        blk_mq_end_request(rq, errno_to_blk_status(result));
4700}
4701
4702static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4703                const struct blk_mq_queue_data *bd)
4704{
4705        struct rbd_device *rbd_dev = hctx->queue->queuedata;
4706        struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4707        enum obj_operation_type op_type;
4708
4709        switch (req_op(bd->rq)) {
4710        case REQ_OP_DISCARD:
4711                op_type = OBJ_OP_DISCARD;
4712                break;
4713        case REQ_OP_WRITE_ZEROES:
4714                op_type = OBJ_OP_ZEROOUT;
4715                break;
4716        case REQ_OP_WRITE:
4717                op_type = OBJ_OP_WRITE;
4718                break;
4719        case REQ_OP_READ:
4720                op_type = OBJ_OP_READ;
4721                break;
4722        default:
4723                rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4724                return BLK_STS_IOERR;
4725        }
4726
4727        rbd_img_request_init(img_req, rbd_dev, op_type);
4728
4729        if (rbd_img_is_write(img_req)) {
4730                if (rbd_is_ro(rbd_dev)) {
4731                        rbd_warn(rbd_dev, "%s on read-only mapping",
4732                                 obj_op_name(img_req->op_type));
4733                        return BLK_STS_IOERR;
4734                }
4735                rbd_assert(!rbd_is_snap(rbd_dev));
4736        }
4737
4738        INIT_WORK(&img_req->work, rbd_queue_workfn);
4739        queue_work(rbd_wq, &img_req->work);
4740        return BLK_STS_OK;
4741}
4742
4743static void rbd_free_disk(struct rbd_device *rbd_dev)
4744{
4745        blk_cleanup_disk(rbd_dev->disk);
4746        blk_mq_free_tag_set(&rbd_dev->tag_set);
4747        rbd_dev->disk = NULL;
4748}
4749
4750static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4751                             struct ceph_object_id *oid,
4752                             struct ceph_object_locator *oloc,
4753                             void *buf, int buf_len)
4754
4755{
4756        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4757        struct ceph_osd_request *req;
4758        struct page **pages;
4759        int num_pages = calc_pages_for(0, buf_len);
4760        int ret;
4761
4762        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4763        if (!req)
4764                return -ENOMEM;
4765
4766        ceph_oid_copy(&req->r_base_oid, oid);
4767        ceph_oloc_copy(&req->r_base_oloc, oloc);
4768        req->r_flags = CEPH_OSD_FLAG_READ;
4769
4770        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4771        if (IS_ERR(pages)) {
4772                ret = PTR_ERR(pages);
4773                goto out_req;
4774        }
4775
4776        osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4777        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4778                                         true);
4779
4780        ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4781        if (ret)
4782                goto out_req;
4783
4784        ceph_osdc_start_request(osdc, req, false);
4785        ret = ceph_osdc_wait_request(osdc, req);
4786        if (ret >= 0)
4787                ceph_copy_from_page_vector(pages, buf, 0, ret);
4788
4789out_req:
4790        ceph_osdc_put_request(req);
4791        return ret;
4792}
4793
4794/*
4795 * Read the complete header for the given rbd device.  On successful
4796 * return, the rbd_dev->header field will contain up-to-date
4797 * information about the image.
4798 */
4799static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4800{
4801        struct rbd_image_header_ondisk *ondisk = NULL;
4802        u32 snap_count = 0;
4803        u64 names_size = 0;
4804        u32 want_count;
4805        int ret;
4806
4807        /*
4808         * The complete header will include an array of its 64-bit
4809         * snapshot ids, followed by the names of those snapshots as
4810         * a contiguous block of NUL-terminated strings.  Note that
4811         * the number of snapshots could change by the time we read
4812         * it in, in which case we re-read it.
4813         */
4814        do {
4815                size_t size;
4816
4817                kfree(ondisk);
4818
4819                size = sizeof (*ondisk);
4820                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4821                size += names_size;
4822                ondisk = kmalloc(size, GFP_KERNEL);
4823                if (!ondisk)
4824                        return -ENOMEM;
4825
4826                ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4827                                        &rbd_dev->header_oloc, ondisk, size);
4828                if (ret < 0)
4829                        goto out;
4830                if ((size_t)ret < size) {
4831                        ret = -ENXIO;
4832                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4833                                size, ret);
4834                        goto out;
4835                }
4836                if (!rbd_dev_ondisk_valid(ondisk)) {
4837                        ret = -ENXIO;
4838                        rbd_warn(rbd_dev, "invalid header");
4839                        goto out;
4840                }
4841
4842                names_size = le64_to_cpu(ondisk->snap_names_len);
4843                want_count = snap_count;
4844                snap_count = le32_to_cpu(ondisk->snap_count);
4845        } while (snap_count != want_count);
4846
4847        ret = rbd_header_from_disk(rbd_dev, ondisk);
4848out:
4849        kfree(ondisk);
4850
4851        return ret;
4852}
4853
4854static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4855{
4856        sector_t size;
4857
4858        /*
4859         * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4860         * try to update its size.  If REMOVING is set, updating size
4861         * is just useless work since the device can't be opened.
4862         */
4863        if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4864            !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4865                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4866                dout("setting size to %llu sectors", (unsigned long long)size);
4867                set_capacity_and_notify(rbd_dev->disk, size);
4868        }
4869}
4870
4871static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4872{
4873        u64 mapping_size;
4874        int ret;
4875
4876        down_write(&rbd_dev->header_rwsem);
4877        mapping_size = rbd_dev->mapping.size;
4878
4879        ret = rbd_dev_header_info(rbd_dev);
4880        if (ret)
4881                goto out;
4882
4883        /*
4884         * If there is a parent, see if it has disappeared due to the
4885         * mapped image getting flattened.
4886         */
4887        if (rbd_dev->parent) {
4888                ret = rbd_dev_v2_parent_info(rbd_dev);
4889                if (ret)
4890                        goto out;
4891        }
4892
4893        rbd_assert(!rbd_is_snap(rbd_dev));
4894        rbd_dev->mapping.size = rbd_dev->header.image_size;
4895
4896out:
4897        up_write(&rbd_dev->header_rwsem);
4898        if (!ret && mapping_size != rbd_dev->mapping.size)
4899                rbd_dev_update_size(rbd_dev);
4900
4901        return ret;
4902}
4903
4904static const struct blk_mq_ops rbd_mq_ops = {
4905        .queue_rq       = rbd_queue_rq,
4906};
4907
4908static int rbd_init_disk(struct rbd_device *rbd_dev)
4909{
4910        struct gendisk *disk;
4911        struct request_queue *q;
4912        unsigned int objset_bytes =
4913            rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4914        int err;
4915
4916        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4917        rbd_dev->tag_set.ops = &rbd_mq_ops;
4918        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4919        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4920        rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4921        rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4922        rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4923
4924        err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4925        if (err)
4926                return err;
4927
4928        disk = blk_mq_alloc_disk(&rbd_dev->tag_set, rbd_dev);
4929        if (IS_ERR(disk)) {
4930                err = PTR_ERR(disk);
4931                goto out_tag_set;
4932        }
4933        q = disk->queue;
4934
4935        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4936                 rbd_dev->dev_id);
4937        disk->major = rbd_dev->major;
4938        disk->first_minor = rbd_dev->minor;
4939        if (single_major) {
4940                disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
4941                disk->flags |= GENHD_FL_EXT_DEVT;
4942        } else {
4943                disk->minors = RBD_MINORS_PER_MAJOR;
4944        }
4945        disk->fops = &rbd_bd_ops;
4946        disk->private_data = rbd_dev;
4947
4948        blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4949        /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4950
4951        blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4952        q->limits.max_sectors = queue_max_hw_sectors(q);
4953        blk_queue_max_segments(q, USHRT_MAX);
4954        blk_queue_max_segment_size(q, UINT_MAX);
4955        blk_queue_io_min(q, rbd_dev->opts->alloc_size);
4956        blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
4957
4958        if (rbd_dev->opts->trim) {
4959                blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4960                q->limits.discard_granularity = rbd_dev->opts->alloc_size;
4961                blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4962                blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4963        }
4964
4965        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4966                blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
4967
4968        rbd_dev->disk = disk;
4969
4970        return 0;
4971out_tag_set:
4972        blk_mq_free_tag_set(&rbd_dev->tag_set);
4973        return err;
4974}
4975
4976/*
4977  sysfs
4978*/
4979
4980static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4981{
4982        return container_of(dev, struct rbd_device, dev);
4983}
4984
4985static ssize_t rbd_size_show(struct device *dev,
4986                             struct device_attribute *attr, char *buf)
4987{
4988        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4989
4990        return sprintf(buf, "%llu\n",
4991                (unsigned long long)rbd_dev->mapping.size);
4992}
4993
4994static ssize_t rbd_features_show(struct device *dev,
4995                             struct device_attribute *attr, char *buf)
4996{
4997        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4998
4999        return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5000}
5001
5002static ssize_t rbd_major_show(struct device *dev,
5003                              struct device_attribute *attr, char *buf)
5004{
5005        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5006
5007        if (rbd_dev->major)
5008                return sprintf(buf, "%d\n", rbd_dev->major);
5009
5010        return sprintf(buf, "(none)\n");
5011}
5012
5013static ssize_t rbd_minor_show(struct device *dev,
5014                              struct device_attribute *attr, char *buf)
5015{
5016        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5017
5018        return sprintf(buf, "%d\n", rbd_dev->minor);
5019}
5020
5021static ssize_t rbd_client_addr_show(struct device *dev,
5022                                    struct device_attribute *attr, char *buf)
5023{
5024        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5025        struct ceph_entity_addr *client_addr =
5026            ceph_client_addr(rbd_dev->rbd_client->client);
5027
5028        return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5029                       le32_to_cpu(client_addr->nonce));
5030}
5031
5032static ssize_t rbd_client_id_show(struct device *dev,
5033                                  struct device_attribute *attr, char *buf)
5034{
5035        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5036
5037        return sprintf(buf, "client%lld\n",
5038                       ceph_client_gid(rbd_dev->rbd_client->client));
5039}
5040
5041static ssize_t rbd_cluster_fsid_show(struct device *dev,
5042                                     struct device_attribute *attr, char *buf)
5043{
5044        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5045
5046        return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5047}
5048
5049static ssize_t rbd_config_info_show(struct device *dev,
5050                                    struct device_attribute *attr, char *buf)
5051{
5052        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5053
5054        if (!capable(CAP_SYS_ADMIN))
5055                return -EPERM;
5056
5057        return sprintf(buf, "%s\n", rbd_dev->config_info);
5058}
5059
5060static ssize_t rbd_pool_show(struct device *dev,
5061                             struct device_attribute *attr, char *buf)
5062{
5063        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5064
5065        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5066}
5067
5068static ssize_t rbd_pool_id_show(struct device *dev,
5069                             struct device_attribute *attr, char *buf)
5070{
5071        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5072
5073        return sprintf(buf, "%llu\n",
5074                        (unsigned long long) rbd_dev->spec->pool_id);
5075}
5076
5077static ssize_t rbd_pool_ns_show(struct device *dev,
5078                                struct device_attribute *attr, char *buf)
5079{
5080        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5081
5082        return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5083}
5084
5085static ssize_t rbd_name_show(struct device *dev,
5086                             struct device_attribute *attr, char *buf)
5087{
5088        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5089
5090        if (rbd_dev->spec->image_name)
5091                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5092
5093        return sprintf(buf, "(unknown)\n");
5094}
5095
5096static ssize_t rbd_image_id_show(struct device *dev,
5097                             struct device_attribute *attr, char *buf)
5098{
5099        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5100
5101        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5102}
5103
5104/*
5105 * Shows the name of the currently-mapped snapshot (or
5106 * RBD_SNAP_HEAD_NAME for the base image).
5107 */
5108static ssize_t rbd_snap_show(struct device *dev,
5109                             struct device_attribute *attr,
5110                             char *buf)
5111{
5112        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5113
5114        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5115}
5116
5117static ssize_t rbd_snap_id_show(struct device *dev,
5118                                struct device_attribute *attr, char *buf)
5119{
5120        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5121
5122        return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5123}
5124
5125/*
5126 * For a v2 image, shows the chain of parent images, separated by empty
5127 * lines.  For v1 images or if there is no parent, shows "(no parent
5128 * image)".
5129 */
5130static ssize_t rbd_parent_show(struct device *dev,
5131                               struct device_attribute *attr,
5132                               char *buf)
5133{
5134        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5135        ssize_t count = 0;
5136
5137        if (!rbd_dev->parent)
5138                return sprintf(buf, "(no parent image)\n");
5139
5140        for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5141                struct rbd_spec *spec = rbd_dev->parent_spec;
5142
5143                count += sprintf(&buf[count], "%s"
5144                            "pool_id %llu\npool_name %s\n"
5145                            "pool_ns %s\n"
5146                            "image_id %s\nimage_name %s\n"
5147                            "snap_id %llu\nsnap_name %s\n"
5148                            "overlap %llu\n",
5149                            !count ? "" : "\n", /* first? */
5150                            spec->pool_id, spec->pool_name,
5151                            spec->pool_ns ?: "",
5152                            spec->image_id, spec->image_name ?: "(unknown)",
5153                            spec->snap_id, spec->snap_name,
5154                            rbd_dev->parent_overlap);
5155        }
5156
5157        return count;
5158}
5159
5160static ssize_t rbd_image_refresh(struct device *dev,
5161                                 struct device_attribute *attr,
5162                                 const char *buf,
5163                                 size_t size)
5164{
5165        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5166        int ret;
5167
5168        if (!capable(CAP_SYS_ADMIN))
5169                return -EPERM;
5170
5171        ret = rbd_dev_refresh(rbd_dev);
5172        if (ret)
5173                return ret;
5174
5175        return size;
5176}
5177
5178static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5179static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5180static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5181static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5182static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5183static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5184static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5185static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5186static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5187static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5188static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5189static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5190static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5191static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5192static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5193static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5194static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5195
5196static struct attribute *rbd_attrs[] = {
5197        &dev_attr_size.attr,
5198        &dev_attr_features.attr,
5199        &dev_attr_major.attr,
5200        &dev_attr_minor.attr,
5201        &dev_attr_client_addr.attr,
5202        &dev_attr_client_id.attr,
5203        &dev_attr_cluster_fsid.attr,
5204        &dev_attr_config_info.attr,
5205        &dev_attr_pool.attr,
5206        &dev_attr_pool_id.attr,
5207        &dev_attr_pool_ns.attr,
5208        &dev_attr_name.attr,
5209        &dev_attr_image_id.attr,
5210        &dev_attr_current_snap.attr,
5211        &dev_attr_snap_id.attr,
5212        &dev_attr_parent.attr,
5213        &dev_attr_refresh.attr,
5214        NULL
5215};
5216
5217static struct attribute_group rbd_attr_group = {
5218        .attrs = rbd_attrs,
5219};
5220
5221static const struct attribute_group *rbd_attr_groups[] = {
5222        &rbd_attr_group,
5223        NULL
5224};
5225
5226static void rbd_dev_release(struct device *dev);
5227
5228static const struct device_type rbd_device_type = {
5229        .name           = "rbd",
5230        .groups         = rbd_attr_groups,
5231        .release        = rbd_dev_release,
5232};
5233
5234static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5235{
5236        kref_get(&spec->kref);
5237
5238        return spec;
5239}
5240
5241static void rbd_spec_free(struct kref *kref);
5242static void rbd_spec_put(struct rbd_spec *spec)
5243{
5244        if (spec)
5245                kref_put(&spec->kref, rbd_spec_free);
5246}
5247
5248static struct rbd_spec *rbd_spec_alloc(void)
5249{
5250        struct rbd_spec *spec;
5251
5252        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5253        if (!spec)
5254                return NULL;
5255
5256        spec->pool_id = CEPH_NOPOOL;
5257        spec->snap_id = CEPH_NOSNAP;
5258        kref_init(&spec->kref);
5259
5260        return spec;
5261}
5262
5263static void rbd_spec_free(struct kref *kref)
5264{
5265        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5266
5267        kfree(spec->pool_name);
5268        kfree(spec->pool_ns);
5269        kfree(spec->image_id);
5270        kfree(spec->image_name);
5271        kfree(spec->snap_name);
5272        kfree(spec);
5273}
5274
5275static void rbd_dev_free(struct rbd_device *rbd_dev)
5276{
5277        WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5278        WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5279
5280        ceph_oid_destroy(&rbd_dev->header_oid);
5281        ceph_oloc_destroy(&rbd_dev->header_oloc);
5282        kfree(rbd_dev->config_info);
5283
5284        rbd_put_client(rbd_dev->rbd_client);
5285        rbd_spec_put(rbd_dev->spec);
5286        kfree(rbd_dev->opts);
5287        kfree(rbd_dev);
5288}
5289
5290static void rbd_dev_release(struct device *dev)
5291{
5292        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5293        bool need_put = !!rbd_dev->opts;
5294
5295        if (need_put) {
5296                destroy_workqueue(rbd_dev->task_wq);
5297                ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5298        }
5299
5300        rbd_dev_free(rbd_dev);
5301
5302        /*
5303         * This is racy, but way better than putting module outside of
5304         * the release callback.  The race window is pretty small, so
5305         * doing something similar to dm (dm-builtin.c) is overkill.
5306         */
5307        if (need_put)
5308                module_put(THIS_MODULE);
5309}
5310
5311static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5312                                           struct rbd_spec *spec)
5313{
5314        struct rbd_device *rbd_dev;
5315
5316        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5317        if (!rbd_dev)
5318                return NULL;
5319
5320        spin_lock_init(&rbd_dev->lock);
5321        INIT_LIST_HEAD(&rbd_dev->node);
5322        init_rwsem(&rbd_dev->header_rwsem);
5323
5324        rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5325        ceph_oid_init(&rbd_dev->header_oid);
5326        rbd_dev->header_oloc.pool = spec->pool_id;
5327        if (spec->pool_ns) {
5328                WARN_ON(!*spec->pool_ns);
5329                rbd_dev->header_oloc.pool_ns =
5330                    ceph_find_or_create_string(spec->pool_ns,
5331                                               strlen(spec->pool_ns));
5332        }
5333
5334        mutex_init(&rbd_dev->watch_mutex);
5335        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5336        INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5337
5338        init_rwsem(&rbd_dev->lock_rwsem);
5339        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5340        INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5341        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5342        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5343        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5344        spin_lock_init(&rbd_dev->lock_lists_lock);
5345        INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5346        INIT_LIST_HEAD(&rbd_dev->running_list);
5347        init_completion(&rbd_dev->acquire_wait);
5348        init_completion(&rbd_dev->releasing_wait);
5349
5350        spin_lock_init(&rbd_dev->object_map_lock);
5351
5352        rbd_dev->dev.bus = &rbd_bus_type;
5353        rbd_dev->dev.type = &rbd_device_type;
5354        rbd_dev->dev.parent = &rbd_root_dev;
5355        device_initialize(&rbd_dev->dev);
5356
5357        rbd_dev->rbd_client = rbdc;
5358        rbd_dev->spec = spec;
5359
5360        return rbd_dev;
5361}
5362
5363/*
5364 * Create a mapping rbd_dev.
5365 */
5366static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5367                                         struct rbd_spec *spec,
5368                                         struct rbd_options *opts)
5369{
5370        struct rbd_device *rbd_dev;
5371
5372        rbd_dev = __rbd_dev_create(rbdc, spec);
5373        if (!rbd_dev)
5374                return NULL;
5375
5376        rbd_dev->opts = opts;
5377
5378        /* get an id and fill in device name */
5379        rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5380                                         minor_to_rbd_dev_id(1 << MINORBITS),
5381                                         GFP_KERNEL);
5382        if (rbd_dev->dev_id < 0)
5383                goto fail_rbd_dev;
5384
5385        sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5386        rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5387                                                   rbd_dev->name);
5388        if (!rbd_dev->task_wq)
5389                goto fail_dev_id;
5390
5391        /* we have a ref from do_rbd_add() */
5392        __module_get(THIS_MODULE);
5393
5394        dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5395        return rbd_dev;
5396
5397fail_dev_id:
5398        ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5399fail_rbd_dev:
5400        rbd_dev_free(rbd_dev);
5401        return NULL;
5402}
5403
5404static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5405{
5406        if (rbd_dev)
5407                put_device(&rbd_dev->dev);
5408}
5409
5410/*
5411 * Get the size and object order for an image snapshot, or if
5412 * snap_id is CEPH_NOSNAP, gets this information for the base
5413 * image.
5414 */
5415static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5416                                u8 *order, u64 *snap_size)
5417{
5418        __le64 snapid = cpu_to_le64(snap_id);
5419        int ret;
5420        struct {
5421                u8 order;
5422                __le64 size;
5423        } __attribute__ ((packed)) size_buf = { 0 };
5424
5425        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5426                                  &rbd_dev->header_oloc, "get_size",
5427                                  &snapid, sizeof(snapid),
5428                                  &size_buf, sizeof(size_buf));
5429        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5430        if (ret < 0)
5431                return ret;
5432        if (ret < sizeof (size_buf))
5433                return -ERANGE;
5434
5435        if (order) {
5436                *order = size_buf.order;
5437                dout("  order %u", (unsigned int)*order);
5438        }
5439        *snap_size = le64_to_cpu(size_buf.size);
5440
5441        dout("  snap_id 0x%016llx snap_size = %llu\n",
5442                (unsigned long long)snap_id,
5443                (unsigned long long)*snap_size);
5444
5445        return 0;
5446}
5447
5448static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5449{
5450        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5451                                        &rbd_dev->header.obj_order,
5452                                        &rbd_dev->header.image_size);
5453}
5454
5455static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5456{
5457        size_t size;
5458        void *reply_buf;
5459        int ret;
5460        void *p;
5461
5462        /* Response will be an encoded string, which includes a length */
5463        size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5464        reply_buf = kzalloc(size, GFP_KERNEL);
5465        if (!reply_buf)
5466                return -ENOMEM;
5467
5468        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5469                                  &rbd_dev->header_oloc, "get_object_prefix",
5470                                  NULL, 0, reply_buf, size);
5471        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5472        if (ret < 0)
5473                goto out;
5474
5475        p = reply_buf;
5476        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5477                                                p + ret, NULL, GFP_NOIO);
5478        ret = 0;
5479
5480        if (IS_ERR(rbd_dev->header.object_prefix)) {
5481                ret = PTR_ERR(rbd_dev->header.object_prefix);
5482                rbd_dev->header.object_prefix = NULL;
5483        } else {
5484                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5485        }
5486out:
5487        kfree(reply_buf);
5488
5489        return ret;
5490}
5491
5492static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5493                                     bool read_only, u64 *snap_features)
5494{
5495        struct {
5496                __le64 snap_id;
5497                u8 read_only;
5498        } features_in;
5499        struct {
5500                __le64 features;
5501                __le64 incompat;
5502        } __attribute__ ((packed)) features_buf = { 0 };
5503        u64 unsup;
5504        int ret;
5505
5506        features_in.snap_id = cpu_to_le64(snap_id);
5507        features_in.read_only = read_only;
5508
5509        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5510                                  &rbd_dev->header_oloc, "get_features",
5511                                  &features_in, sizeof(features_in),
5512                                  &features_buf, sizeof(features_buf));
5513        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5514        if (ret < 0)
5515                return ret;
5516        if (ret < sizeof (features_buf))
5517                return -ERANGE;
5518
5519        unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5520        if (unsup) {
5521                rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5522                         unsup);
5523                return -ENXIO;
5524        }
5525
5526        *snap_features = le64_to_cpu(features_buf.features);
5527
5528        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5529                (unsigned long long)snap_id,
5530                (unsigned long long)*snap_features,
5531                (unsigned long long)le64_to_cpu(features_buf.incompat));
5532
5533        return 0;
5534}
5535
5536static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5537{
5538        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5539                                         rbd_is_ro(rbd_dev),
5540                                         &rbd_dev->header.features);
5541}
5542
5543/*
5544 * These are generic image flags, but since they are used only for
5545 * object map, store them in rbd_dev->object_map_flags.
5546 *
5547 * For the same reason, this function is called only on object map
5548 * (re)load and not on header refresh.
5549 */
5550static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5551{
5552        __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5553        __le64 flags;
5554        int ret;
5555
5556        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5557                                  &rbd_dev->header_oloc, "get_flags",
5558                                  &snapid, sizeof(snapid),
5559                                  &flags, sizeof(flags));
5560        if (ret < 0)
5561                return ret;
5562        if (ret < sizeof(flags))
5563                return -EBADMSG;
5564
5565        rbd_dev->object_map_flags = le64_to_cpu(flags);
5566        return 0;
5567}
5568
5569struct parent_image_info {
5570        u64             pool_id;
5571        const char      *pool_ns;
5572        const char      *image_id;
5573        u64             snap_id;
5574
5575        bool            has_overlap;
5576        u64             overlap;
5577};
5578
5579/*
5580 * The caller is responsible for @pii.
5581 */
5582static int decode_parent_image_spec(void **p, void *end,
5583                                    struct parent_image_info *pii)
5584{
5585        u8 struct_v;
5586        u32 struct_len;
5587        int ret;
5588
5589        ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5590                                  &struct_v, &struct_len);
5591        if (ret)
5592                return ret;
5593
5594        ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5595        pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5596        if (IS_ERR(pii->pool_ns)) {
5597                ret = PTR_ERR(pii->pool_ns);
5598                pii->pool_ns = NULL;
5599                return ret;
5600        }
5601        pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5602        if (IS_ERR(pii->image_id)) {
5603                ret = PTR_ERR(pii->image_id);
5604                pii->image_id = NULL;
5605                return ret;
5606        }
5607        ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5608        return 0;
5609
5610e_inval:
5611        return -EINVAL;
5612}
5613
5614static int __get_parent_info(struct rbd_device *rbd_dev,
5615                             struct page *req_page,
5616                             struct page *reply_page,
5617                             struct parent_image_info *pii)
5618{
5619        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5620        size_t reply_len = PAGE_SIZE;
5621        void *p, *end;
5622        int ret;
5623
5624        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5625                             "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5626                             req_page, sizeof(u64), &reply_page, &reply_len);
5627        if (ret)
5628                return ret == -EOPNOTSUPP ? 1 : ret;
5629
5630        p = page_address(reply_page);
5631        end = p + reply_len;
5632        ret = decode_parent_image_spec(&p, end, pii);
5633        if (ret)
5634                return ret;
5635
5636        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5637                             "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5638                             req_page, sizeof(u64), &reply_page, &reply_len);
5639        if (ret)
5640                return ret;
5641
5642        p = page_address(reply_page);
5643        end = p + reply_len;
5644        ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5645        if (pii->has_overlap)
5646                ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5647
5648        return 0;
5649
5650e_inval:
5651        return -EINVAL;
5652}
5653
5654/*
5655 * The caller is responsible for @pii.
5656 */
5657static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5658                                    struct page *req_page,
5659                                    struct page *reply_page,
5660                                    struct parent_image_info *pii)
5661{
5662        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5663        size_t reply_len = PAGE_SIZE;
5664        void *p, *end;
5665        int ret;
5666
5667        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5668                             "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5669                             req_page, sizeof(u64), &reply_page, &reply_len);
5670        if (ret)
5671                return ret;
5672
5673        p = page_address(reply_page);
5674        end = p + reply_len;
5675        ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5676        pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5677        if (IS_ERR(pii->image_id)) {
5678                ret = PTR_ERR(pii->image_id);
5679                pii->image_id = NULL;
5680                return ret;
5681        }
5682        ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5683        pii->has_overlap = true;
5684        ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5685
5686        return 0;
5687
5688e_inval:
5689        return -EINVAL;
5690}
5691
5692static int get_parent_info(struct rbd_device *rbd_dev,
5693                           struct parent_image_info *pii)
5694{
5695        struct page *req_page, *reply_page;
5696        void *p;
5697        int ret;
5698
5699        req_page = alloc_page(GFP_KERNEL);
5700        if (!req_page)
5701                return -ENOMEM;
5702
5703        reply_page = alloc_page(GFP_KERNEL);
5704        if (!reply_page) {
5705                __free_page(req_page);
5706                return -ENOMEM;
5707        }
5708
5709        p = page_address(req_page);
5710        ceph_encode_64(&p, rbd_dev->spec->snap_id);
5711        ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5712        if (ret > 0)
5713                ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5714                                               pii);
5715
5716        __free_page(req_page);
5717        __free_page(reply_page);
5718        return ret;
5719}
5720
5721static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5722{
5723        struct rbd_spec *parent_spec;
5724        struct parent_image_info pii = { 0 };
5725        int ret;
5726
5727        parent_spec = rbd_spec_alloc();
5728        if (!parent_spec)
5729                return -ENOMEM;
5730
5731        ret = get_parent_info(rbd_dev, &pii);
5732        if (ret)
5733                goto out_err;
5734
5735        dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5736             __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5737             pii.has_overlap, pii.overlap);
5738
5739        if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5740                /*
5741                 * Either the parent never existed, or we have
5742                 * record of it but the image got flattened so it no
5743                 * longer has a parent.  When the parent of a
5744                 * layered image disappears we immediately set the
5745                 * overlap to 0.  The effect of this is that all new
5746                 * requests will be treated as if the image had no
5747                 * parent.
5748                 *
5749                 * If !pii.has_overlap, the parent image spec is not
5750                 * applicable.  It's there to avoid duplication in each
5751                 * snapshot record.
5752                 */
5753                if (rbd_dev->parent_overlap) {
5754                        rbd_dev->parent_overlap = 0;
5755                        rbd_dev_parent_put(rbd_dev);
5756                        pr_info("%s: clone image has been flattened\n",
5757                                rbd_dev->disk->disk_name);
5758                }
5759
5760                goto out;       /* No parent?  No problem. */
5761        }
5762
5763        /* The ceph file layout needs to fit pool id in 32 bits */
5764
5765        ret = -EIO;
5766        if (pii.pool_id > (u64)U32_MAX) {
5767                rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5768                        (unsigned long long)pii.pool_id, U32_MAX);
5769                goto out_err;
5770        }
5771
5772        /*
5773         * The parent won't change (except when the clone is
5774         * flattened, already handled that).  So we only need to
5775         * record the parent spec we have not already done so.
5776         */
5777        if (!rbd_dev->parent_spec) {
5778                parent_spec->pool_id = pii.pool_id;
5779                if (pii.pool_ns && *pii.pool_ns) {
5780                        parent_spec->pool_ns = pii.pool_ns;
5781                        pii.pool_ns = NULL;
5782                }
5783                parent_spec->image_id = pii.image_id;
5784                pii.image_id = NULL;
5785                parent_spec->snap_id = pii.snap_id;
5786
5787                rbd_dev->parent_spec = parent_spec;
5788                parent_spec = NULL;     /* rbd_dev now owns this */
5789        }
5790
5791        /*
5792         * We always update the parent overlap.  If it's zero we issue
5793         * a warning, as we will proceed as if there was no parent.
5794         */
5795        if (!pii.overlap) {
5796                if (parent_spec) {
5797                        /* refresh, careful to warn just once */
5798                        if (rbd_dev->parent_overlap)
5799                                rbd_warn(rbd_dev,
5800                                    "clone now standalone (overlap became 0)");
5801                } else {
5802                        /* initial probe */
5803                        rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5804                }
5805        }
5806        rbd_dev->parent_overlap = pii.overlap;
5807
5808out:
5809        ret = 0;
5810out_err:
5811        kfree(pii.pool_ns);
5812        kfree(pii.image_id);
5813        rbd_spec_put(parent_spec);
5814        return ret;
5815}
5816
5817static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5818{
5819        struct {
5820                __le64 stripe_unit;
5821                __le64 stripe_count;
5822        } __attribute__ ((packed)) striping_info_buf = { 0 };
5823        size_t size = sizeof (striping_info_buf);
5824        void *p;
5825        int ret;
5826
5827        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5828                                &rbd_dev->header_oloc, "get_stripe_unit_count",
5829                                NULL, 0, &striping_info_buf, size);
5830        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5831        if (ret < 0)
5832                return ret;
5833        if (ret < size)
5834                return -ERANGE;
5835
5836        p = &striping_info_buf;
5837        rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5838        rbd_dev->header.stripe_count = ceph_decode_64(&p);
5839        return 0;
5840}
5841
5842static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5843{
5844        __le64 data_pool_id;
5845        int ret;
5846
5847        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5848                                  &rbd_dev->header_oloc, "get_data_pool",
5849                                  NULL, 0, &data_pool_id, sizeof(data_pool_id));
5850        if (ret < 0)
5851                return ret;
5852        if (ret < sizeof(data_pool_id))
5853                return -EBADMSG;
5854
5855        rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5856        WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5857        return 0;
5858}
5859
5860static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5861{
5862        CEPH_DEFINE_OID_ONSTACK(oid);
5863        size_t image_id_size;
5864        char *image_id;
5865        void *p;
5866        void *end;
5867        size_t size;
5868        void *reply_buf = NULL;
5869        size_t len = 0;
5870        char *image_name = NULL;
5871        int ret;
5872
5873        rbd_assert(!rbd_dev->spec->image_name);
5874
5875        len = strlen(rbd_dev->spec->image_id);
5876        image_id_size = sizeof (__le32) + len;
5877        image_id = kmalloc(image_id_size, GFP_KERNEL);
5878        if (!image_id)
5879                return NULL;
5880
5881        p = image_id;
5882        end = image_id + image_id_size;
5883        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5884
5885        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5886        reply_buf = kmalloc(size, GFP_KERNEL);
5887        if (!reply_buf)
5888                goto out;
5889
5890        ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5891        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5892                                  "dir_get_name", image_id, image_id_size,
5893                                  reply_buf, size);
5894        if (ret < 0)
5895                goto out;
5896        p = reply_buf;
5897        end = reply_buf + ret;
5898
5899        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5900        if (IS_ERR(image_name))
5901                image_name = NULL;
5902        else
5903                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5904out:
5905        kfree(reply_buf);
5906        kfree(image_id);
5907
5908        return image_name;
5909}
5910
5911static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5912{
5913        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5914        const char *snap_name;
5915        u32 which = 0;
5916
5917        /* Skip over names until we find the one we are looking for */
5918
5919        snap_name = rbd_dev->header.snap_names;
5920        while (which < snapc->num_snaps) {
5921                if (!strcmp(name, snap_name))
5922                        return snapc->snaps[which];
5923                snap_name += strlen(snap_name) + 1;
5924                which++;
5925        }
5926        return CEPH_NOSNAP;
5927}
5928
5929static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5930{
5931        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5932        u32 which;
5933        bool found = false;
5934        u64 snap_id;
5935
5936        for (which = 0; !found && which < snapc->num_snaps; which++) {
5937                const char *snap_name;
5938
5939                snap_id = snapc->snaps[which];
5940                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5941                if (IS_ERR(snap_name)) {
5942                        /* ignore no-longer existing snapshots */
5943                        if (PTR_ERR(snap_name) == -ENOENT)
5944                                continue;
5945                        else
5946                                break;
5947                }
5948                found = !strcmp(name, snap_name);
5949                kfree(snap_name);
5950        }
5951        return found ? snap_id : CEPH_NOSNAP;
5952}
5953
5954/*
5955 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5956 * no snapshot by that name is found, or if an error occurs.
5957 */
5958static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5959{
5960        if (rbd_dev->image_format == 1)
5961                return rbd_v1_snap_id_by_name(rbd_dev, name);
5962
5963        return rbd_v2_snap_id_by_name(rbd_dev, name);
5964}
5965
5966/*
5967 * An image being mapped will have everything but the snap id.
5968 */
5969static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5970{
5971        struct rbd_spec *spec = rbd_dev->spec;
5972
5973        rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5974        rbd_assert(spec->image_id && spec->image_name);
5975        rbd_assert(spec->snap_name);
5976
5977        if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5978                u64 snap_id;
5979
5980                snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5981                if (snap_id == CEPH_NOSNAP)
5982                        return -ENOENT;
5983
5984                spec->snap_id = snap_id;
5985        } else {
5986                spec->snap_id = CEPH_NOSNAP;
5987        }
5988
5989        return 0;
5990}
5991
5992/*
5993 * A parent image will have all ids but none of the names.
5994 *
5995 * All names in an rbd spec are dynamically allocated.  It's OK if we
5996 * can't figure out the name for an image id.
5997 */
5998static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5999{
6000        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6001        struct rbd_spec *spec = rbd_dev->spec;
6002        const char *pool_name;
6003        const char *image_name;
6004        const char *snap_name;
6005        int ret;
6006
6007        rbd_assert(spec->pool_id != CEPH_NOPOOL);
6008        rbd_assert(spec->image_id);
6009        rbd_assert(spec->snap_id != CEPH_NOSNAP);
6010
6011        /* Get the pool name; we have to make our own copy of this */
6012
6013        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6014        if (!pool_name) {
6015                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6016                return -EIO;
6017        }
6018        pool_name = kstrdup(pool_name, GFP_KERNEL);
6019        if (!pool_name)
6020                return -ENOMEM;
6021
6022        /* Fetch the image name; tolerate failure here */
6023
6024        image_name = rbd_dev_image_name(rbd_dev);
6025        if (!image_name)
6026                rbd_warn(rbd_dev, "unable to get image name");
6027
6028        /* Fetch the snapshot name */
6029
6030        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6031        if (IS_ERR(snap_name)) {
6032                ret = PTR_ERR(snap_name);
6033                goto out_err;
6034        }
6035
6036        spec->pool_name = pool_name;
6037        spec->image_name = image_name;
6038        spec->snap_name = snap_name;
6039
6040        return 0;
6041
6042out_err:
6043        kfree(image_name);
6044        kfree(pool_name);
6045        return ret;
6046}
6047
6048static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6049{
6050        size_t size;
6051        int ret;
6052        void *reply_buf;
6053        void *p;
6054        void *end;
6055        u64 seq;
6056        u32 snap_count;
6057        struct ceph_snap_context *snapc;
6058        u32 i;
6059
6060        /*
6061         * We'll need room for the seq value (maximum snapshot id),
6062         * snapshot count, and array of that many snapshot ids.
6063         * For now we have a fixed upper limit on the number we're
6064         * prepared to receive.
6065         */
6066        size = sizeof (__le64) + sizeof (__le32) +
6067                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
6068        reply_buf = kzalloc(size, GFP_KERNEL);
6069        if (!reply_buf)
6070                return -ENOMEM;
6071
6072        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6073                                  &rbd_dev->header_oloc, "get_snapcontext",
6074                                  NULL, 0, reply_buf, size);
6075        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6076        if (ret < 0)
6077                goto out;
6078
6079        p = reply_buf;
6080        end = reply_buf + ret;
6081        ret = -ERANGE;
6082        ceph_decode_64_safe(&p, end, seq, out);
6083        ceph_decode_32_safe(&p, end, snap_count, out);
6084
6085        /*
6086         * Make sure the reported number of snapshot ids wouldn't go
6087         * beyond the end of our buffer.  But before checking that,
6088         * make sure the computed size of the snapshot context we
6089         * allocate is representable in a size_t.
6090         */
6091        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6092                                 / sizeof (u64)) {
6093                ret = -EINVAL;
6094                goto out;
6095        }
6096        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6097                goto out;
6098        ret = 0;
6099
6100        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6101        if (!snapc) {
6102                ret = -ENOMEM;
6103                goto out;
6104        }
6105        snapc->seq = seq;
6106        for (i = 0; i < snap_count; i++)
6107                snapc->snaps[i] = ceph_decode_64(&p);
6108
6109        ceph_put_snap_context(rbd_dev->header.snapc);
6110        rbd_dev->header.snapc = snapc;
6111
6112        dout("  snap context seq = %llu, snap_count = %u\n",
6113                (unsigned long long)seq, (unsigned int)snap_count);
6114out:
6115        kfree(reply_buf);
6116
6117        return ret;
6118}
6119
6120static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6121                                        u64 snap_id)
6122{
6123        size_t size;
6124        void *reply_buf;
6125        __le64 snapid;
6126        int ret;
6127        void *p;
6128        void *end;
6129        char *snap_name;
6130
6131        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6132        reply_buf = kmalloc(size, GFP_KERNEL);
6133        if (!reply_buf)
6134                return ERR_PTR(-ENOMEM);
6135
6136        snapid = cpu_to_le64(snap_id);
6137        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6138                                  &rbd_dev->header_oloc, "get_snapshot_name",
6139                                  &snapid, sizeof(snapid), reply_buf, size);
6140        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6141        if (ret < 0) {
6142                snap_name = ERR_PTR(ret);
6143                goto out;
6144        }
6145
6146        p = reply_buf;
6147        end = reply_buf + ret;
6148        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6149        if (IS_ERR(snap_name))
6150                goto out;
6151
6152        dout("  snap_id 0x%016llx snap_name = %s\n",
6153                (unsigned long long)snap_id, snap_name);
6154out:
6155        kfree(reply_buf);
6156
6157        return snap_name;
6158}
6159
6160static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6161{
6162        bool first_time = rbd_dev->header.object_prefix == NULL;
6163        int ret;
6164
6165        ret = rbd_dev_v2_image_size(rbd_dev);
6166        if (ret)
6167                return ret;
6168
6169        if (first_time) {
6170                ret = rbd_dev_v2_header_onetime(rbd_dev);
6171                if (ret)
6172                        return ret;
6173        }
6174
6175        ret = rbd_dev_v2_snap_context(rbd_dev);
6176        if (ret && first_time) {
6177                kfree(rbd_dev->header.object_prefix);
6178                rbd_dev->header.object_prefix = NULL;
6179        }
6180
6181        return ret;
6182}
6183
6184static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6185{
6186        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6187
6188        if (rbd_dev->image_format == 1)
6189                return rbd_dev_v1_header_info(rbd_dev);
6190
6191        return rbd_dev_v2_header_info(rbd_dev);
6192}
6193
6194/*
6195 * Skips over white space at *buf, and updates *buf to point to the
6196 * first found non-space character (if any). Returns the length of
6197 * the token (string of non-white space characters) found.  Note
6198 * that *buf must be terminated with '\0'.
6199 */
6200static inline size_t next_token(const char **buf)
6201{
6202        /*
6203        * These are the characters that produce nonzero for
6204        * isspace() in the "C" and "POSIX" locales.
6205        */
6206        const char *spaces = " \f\n\r\t\v";
6207
6208        *buf += strspn(*buf, spaces);   /* Find start of token */
6209
6210        return strcspn(*buf, spaces);   /* Return token length */
6211}
6212
6213/*
6214 * Finds the next token in *buf, dynamically allocates a buffer big
6215 * enough to hold a copy of it, and copies the token into the new
6216 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6217 * that a duplicate buffer is created even for a zero-length token.
6218 *
6219 * Returns a pointer to the newly-allocated duplicate, or a null
6220 * pointer if memory for the duplicate was not available.  If
6221 * the lenp argument is a non-null pointer, the length of the token
6222 * (not including the '\0') is returned in *lenp.
6223 *
6224 * If successful, the *buf pointer will be updated to point beyond
6225 * the end of the found token.
6226 *
6227 * Note: uses GFP_KERNEL for allocation.
6228 */
6229static inline char *dup_token(const char **buf, size_t *lenp)
6230{
6231        char *dup;
6232        size_t len;
6233
6234        len = next_token(buf);
6235        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6236        if (!dup)
6237                return NULL;
6238        *(dup + len) = '\0';
6239        *buf += len;
6240
6241        if (lenp)
6242                *lenp = len;
6243
6244        return dup;
6245}
6246
6247static int rbd_parse_param(struct fs_parameter *param,
6248                            struct rbd_parse_opts_ctx *pctx)
6249{
6250        struct rbd_options *opt = pctx->opts;
6251        struct fs_parse_result result;
6252        struct p_log log = {.prefix = "rbd"};
6253        int token, ret;
6254
6255        ret = ceph_parse_param(param, pctx->copts, NULL);
6256        if (ret != -ENOPARAM)
6257                return ret;
6258
6259        token = __fs_parse(&log, rbd_parameters, param, &result);
6260        dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6261        if (token < 0) {
6262                if (token == -ENOPARAM)
6263                        return inval_plog(&log, "Unknown parameter '%s'",
6264                                          param->key);
6265                return token;
6266        }
6267
6268        switch (token) {
6269        case Opt_queue_depth:
6270                if (result.uint_32 < 1)
6271                        goto out_of_range;
6272                opt->queue_depth = result.uint_32;
6273                break;
6274        case Opt_alloc_size:
6275                if (result.uint_32 < SECTOR_SIZE)
6276                        goto out_of_range;
6277                if (!is_power_of_2(result.uint_32))
6278                        return inval_plog(&log, "alloc_size must be a power of 2");
6279                opt->alloc_size = result.uint_32;
6280                break;
6281        case Opt_lock_timeout:
6282                /* 0 is "wait forever" (i.e. infinite timeout) */
6283                if (result.uint_32 > INT_MAX / 1000)
6284                        goto out_of_range;
6285                opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6286                break;
6287        case Opt_pool_ns:
6288                kfree(pctx->spec->pool_ns);
6289                pctx->spec->pool_ns = param->string;
6290                param->string = NULL;
6291                break;
6292        case Opt_compression_hint:
6293                switch (result.uint_32) {
6294                case Opt_compression_hint_none:
6295                        opt->alloc_hint_flags &=
6296                            ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6297                              CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6298                        break;
6299                case Opt_compression_hint_compressible:
6300                        opt->alloc_hint_flags |=
6301                            CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6302                        opt->alloc_hint_flags &=
6303                            ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6304                        break;
6305                case Opt_compression_hint_incompressible:
6306                        opt->alloc_hint_flags |=
6307                            CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6308                        opt->alloc_hint_flags &=
6309                            ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6310                        break;
6311                default:
6312                        BUG();
6313                }
6314                break;
6315        case Opt_read_only:
6316                opt->read_only = true;
6317                break;
6318        case Opt_read_write:
6319                opt->read_only = false;
6320                break;
6321        case Opt_lock_on_read:
6322                opt->lock_on_read = true;
6323                break;
6324        case Opt_exclusive:
6325                opt->exclusive = true;
6326                break;
6327        case Opt_notrim:
6328                opt->trim = false;
6329                break;
6330        default:
6331                BUG();
6332        }
6333
6334        return 0;
6335
6336out_of_range:
6337        return inval_plog(&log, "%s out of range", param->key);
6338}
6339
6340/*
6341 * This duplicates most of generic_parse_monolithic(), untying it from
6342 * fs_context and skipping standard superblock and security options.
6343 */
6344static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6345{
6346        char *key;
6347        int ret = 0;
6348
6349        dout("%s '%s'\n", __func__, options);
6350        while ((key = strsep(&options, ",")) != NULL) {
6351                if (*key) {
6352                        struct fs_parameter param = {
6353                                .key    = key,
6354                                .type   = fs_value_is_flag,
6355                        };
6356                        char *value = strchr(key, '=');
6357                        size_t v_len = 0;
6358
6359                        if (value) {
6360                                if (value == key)
6361                                        continue;
6362                                *value++ = 0;
6363                                v_len = strlen(value);
6364                                param.string = kmemdup_nul(value, v_len,
6365                                                           GFP_KERNEL);
6366                                if (!param.string)
6367                                        return -ENOMEM;
6368                                param.type = fs_value_is_string;
6369                        }
6370                        param.size = v_len;
6371
6372                        ret = rbd_parse_param(&param, pctx);
6373                        kfree(param.string);
6374                        if (ret)
6375                                break;
6376                }
6377        }
6378
6379        return ret;
6380}
6381
6382/*
6383 * Parse the options provided for an "rbd add" (i.e., rbd image
6384 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6385 * and the data written is passed here via a NUL-terminated buffer.
6386 * Returns 0 if successful or an error code otherwise.
6387 *
6388 * The information extracted from these options is recorded in
6389 * the other parameters which return dynamically-allocated
6390 * structures:
6391 *  ceph_opts
6392 *      The address of a pointer that will refer to a ceph options
6393 *      structure.  Caller must release the returned pointer using
6394 *      ceph_destroy_options() when it is no longer needed.
6395 *  rbd_opts
6396 *      Address of an rbd options pointer.  Fully initialized by
6397 *      this function; caller must release with kfree().
6398 *  spec
6399 *      Address of an rbd image specification pointer.  Fully
6400 *      initialized by this function based on parsed options.
6401 *      Caller must release with rbd_spec_put().
6402 *
6403 * The options passed take this form:
6404 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6405 * where:
6406 *  <mon_addrs>
6407 *      A comma-separated list of one or more monitor addresses.
6408 *      A monitor address is an ip address, optionally followed
6409 *      by a port number (separated by a colon).
6410 *        I.e.:  ip1[:port1][,ip2[:port2]...]
6411 *  <options>
6412 *      A comma-separated list of ceph and/or rbd options.
6413 *  <pool_name>
6414 *      The name of the rados pool containing the rbd image.
6415 *  <image_name>
6416 *      The name of the image in that pool to map.
6417 *  <snap_id>
6418 *      An optional snapshot id.  If provided, the mapping will
6419 *      present data from the image at the time that snapshot was
6420 *      created.  The image head is used if no snapshot id is
6421 *      provided.  Snapshot mappings are always read-only.
6422 */
6423static int rbd_add_parse_args(const char *buf,
6424                                struct ceph_options **ceph_opts,
6425                                struct rbd_options **opts,
6426                                struct rbd_spec **rbd_spec)
6427{
6428        size_t len;
6429        char *options;
6430        const char *mon_addrs;
6431        char *snap_name;
6432        size_t mon_addrs_size;
6433        struct rbd_parse_opts_ctx pctx = { 0 };
6434        int ret;
6435
6436        /* The first four tokens are required */
6437
6438        len = next_token(&buf);
6439        if (!len) {
6440                rbd_warn(NULL, "no monitor address(es) provided");
6441                return -EINVAL;
6442        }
6443        mon_addrs = buf;
6444        mon_addrs_size = len;
6445        buf += len;
6446
6447        ret = -EINVAL;
6448        options = dup_token(&buf, NULL);
6449        if (!options)
6450                return -ENOMEM;
6451        if (!*options) {
6452                rbd_warn(NULL, "no options provided");
6453                goto out_err;
6454        }
6455
6456        pctx.spec = rbd_spec_alloc();
6457        if (!pctx.spec)
6458                goto out_mem;
6459
6460        pctx.spec->pool_name = dup_token(&buf, NULL);
6461        if (!pctx.spec->pool_name)
6462                goto out_mem;
6463        if (!*pctx.spec->pool_name) {
6464                rbd_warn(NULL, "no pool name provided");
6465                goto out_err;
6466        }
6467
6468        pctx.spec->image_name = dup_token(&buf, NULL);
6469        if (!pctx.spec->image_name)
6470                goto out_mem;
6471        if (!*pctx.spec->image_name) {
6472                rbd_warn(NULL, "no image name provided");
6473                goto out_err;
6474        }
6475
6476        /*
6477         * Snapshot name is optional; default is to use "-"
6478         * (indicating the head/no snapshot).
6479         */
6480        len = next_token(&buf);
6481        if (!len) {
6482                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6483                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6484        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6485                ret = -ENAMETOOLONG;
6486                goto out_err;
6487        }
6488        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6489        if (!snap_name)
6490                goto out_mem;
6491        *(snap_name + len) = '\0';
6492        pctx.spec->snap_name = snap_name;
6493
6494        pctx.copts = ceph_alloc_options();
6495        if (!pctx.copts)
6496                goto out_mem;
6497
6498        /* Initialize all rbd options to the defaults */
6499
6500        pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6501        if (!pctx.opts)
6502                goto out_mem;
6503
6504        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6505        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6506        pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6507        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6508        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6509        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6510        pctx.opts->trim = RBD_TRIM_DEFAULT;
6511
6512        ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6513        if (ret)
6514                goto out_err;
6515
6516        ret = rbd_parse_options(options, &pctx);
6517        if (ret)
6518                goto out_err;
6519
6520        *ceph_opts = pctx.copts;
6521        *opts = pctx.opts;
6522        *rbd_spec = pctx.spec;
6523        kfree(options);
6524        return 0;
6525
6526out_mem:
6527        ret = -ENOMEM;
6528out_err:
6529        kfree(pctx.opts);
6530        ceph_destroy_options(pctx.copts);
6531        rbd_spec_put(pctx.spec);
6532        kfree(options);
6533        return ret;
6534}
6535
6536static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6537{
6538        down_write(&rbd_dev->lock_rwsem);
6539        if (__rbd_is_lock_owner(rbd_dev))
6540                __rbd_release_lock(rbd_dev);
6541        up_write(&rbd_dev->lock_rwsem);
6542}
6543
6544/*
6545 * If the wait is interrupted, an error is returned even if the lock
6546 * was successfully acquired.  rbd_dev_image_unlock() will release it
6547 * if needed.
6548 */
6549static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6550{
6551        long ret;
6552
6553        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6554                if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6555                        return 0;
6556
6557                rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6558                return -EINVAL;
6559        }
6560
6561        if (rbd_is_ro(rbd_dev))
6562                return 0;
6563
6564        rbd_assert(!rbd_is_lock_owner(rbd_dev));
6565        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6566        ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6567                            ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6568        if (ret > 0) {
6569                ret = rbd_dev->acquire_err;
6570        } else {
6571                cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6572                if (!ret)
6573                        ret = -ETIMEDOUT;
6574        }
6575
6576        if (ret) {
6577                rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6578                return ret;
6579        }
6580
6581        /*
6582         * The lock may have been released by now, unless automatic lock
6583         * transitions are disabled.
6584         */
6585        rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6586        return 0;
6587}
6588
6589/*
6590 * An rbd format 2 image has a unique identifier, distinct from the
6591 * name given to it by the user.  Internally, that identifier is
6592 * what's used to specify the names of objects related to the image.
6593 *
6594 * A special "rbd id" object is used to map an rbd image name to its
6595 * id.  If that object doesn't exist, then there is no v2 rbd image
6596 * with the supplied name.
6597 *
6598 * This function will record the given rbd_dev's image_id field if
6599 * it can be determined, and in that case will return 0.  If any
6600 * errors occur a negative errno will be returned and the rbd_dev's
6601 * image_id field will be unchanged (and should be NULL).
6602 */
6603static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6604{
6605        int ret;
6606        size_t size;
6607        CEPH_DEFINE_OID_ONSTACK(oid);
6608        void *response;
6609        char *image_id;
6610
6611        /*
6612         * When probing a parent image, the image id is already
6613         * known (and the image name likely is not).  There's no
6614         * need to fetch the image id again in this case.  We
6615         * do still need to set the image format though.
6616         */
6617        if (rbd_dev->spec->image_id) {
6618                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6619
6620                return 0;
6621        }
6622
6623        /*
6624         * First, see if the format 2 image id file exists, and if
6625         * so, get the image's persistent id from it.
6626         */
6627        ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6628                               rbd_dev->spec->image_name);
6629        if (ret)
6630                return ret;
6631
6632        dout("rbd id object name is %s\n", oid.name);
6633
6634        /* Response will be an encoded string, which includes a length */
6635        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6636        response = kzalloc(size, GFP_NOIO);
6637        if (!response) {
6638                ret = -ENOMEM;
6639                goto out;
6640        }
6641
6642        /* If it doesn't exist we'll assume it's a format 1 image */
6643
6644        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6645                                  "get_id", NULL, 0,
6646                                  response, size);
6647        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6648        if (ret == -ENOENT) {
6649                image_id = kstrdup("", GFP_KERNEL);
6650                ret = image_id ? 0 : -ENOMEM;
6651                if (!ret)
6652                        rbd_dev->image_format = 1;
6653        } else if (ret >= 0) {
6654                void *p = response;
6655
6656                image_id = ceph_extract_encoded_string(&p, p + ret,
6657                                                NULL, GFP_NOIO);
6658                ret = PTR_ERR_OR_ZERO(image_id);
6659                if (!ret)
6660                        rbd_dev->image_format = 2;
6661        }
6662
6663        if (!ret) {
6664                rbd_dev->spec->image_id = image_id;
6665                dout("image_id is %s\n", image_id);
6666        }
6667out:
6668        kfree(response);
6669        ceph_oid_destroy(&oid);
6670        return ret;
6671}
6672
6673/*
6674 * Undo whatever state changes are made by v1 or v2 header info
6675 * call.
6676 */
6677static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6678{
6679        struct rbd_image_header *header;
6680
6681        rbd_dev_parent_put(rbd_dev);
6682        rbd_object_map_free(rbd_dev);
6683        rbd_dev_mapping_clear(rbd_dev);
6684
6685        /* Free dynamic fields from the header, then zero it out */
6686
6687        header = &rbd_dev->header;
6688        ceph_put_snap_context(header->snapc);
6689        kfree(header->snap_sizes);
6690        kfree(header->snap_names);
6691        kfree(header->object_prefix);
6692        memset(header, 0, sizeof (*header));
6693}
6694
6695static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6696{
6697        int ret;
6698
6699        ret = rbd_dev_v2_object_prefix(rbd_dev);
6700        if (ret)
6701                goto out_err;
6702
6703        /*
6704         * Get the and check features for the image.  Currently the
6705         * features are assumed to never change.
6706         */
6707        ret = rbd_dev_v2_features(rbd_dev);
6708        if (ret)
6709                goto out_err;
6710
6711        /* If the image supports fancy striping, get its parameters */
6712
6713        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6714                ret = rbd_dev_v2_striping_info(rbd_dev);
6715                if (ret < 0)
6716                        goto out_err;
6717        }
6718
6719        if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6720                ret = rbd_dev_v2_data_pool(rbd_dev);
6721                if (ret)
6722                        goto out_err;
6723        }
6724
6725        rbd_init_layout(rbd_dev);
6726        return 0;
6727
6728out_err:
6729        rbd_dev->header.features = 0;
6730        kfree(rbd_dev->header.object_prefix);
6731        rbd_dev->header.object_prefix = NULL;
6732        return ret;
6733}
6734
6735/*
6736 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6737 * rbd_dev_image_probe() recursion depth, which means it's also the
6738 * length of the already discovered part of the parent chain.
6739 */
6740static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6741{
6742        struct rbd_device *parent = NULL;
6743        int ret;
6744
6745        if (!rbd_dev->parent_spec)
6746                return 0;
6747
6748        if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6749                pr_info("parent chain is too long (%d)\n", depth);
6750                ret = -EINVAL;
6751                goto out_err;
6752        }
6753
6754        parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6755        if (!parent) {
6756                ret = -ENOMEM;
6757                goto out_err;
6758        }
6759
6760        /*
6761         * Images related by parent/child relationships always share
6762         * rbd_client and spec/parent_spec, so bump their refcounts.
6763         */
6764        __rbd_get_client(rbd_dev->rbd_client);
6765        rbd_spec_get(rbd_dev->parent_spec);
6766
6767        __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6768
6769        ret = rbd_dev_image_probe(parent, depth);
6770        if (ret < 0)
6771                goto out_err;
6772
6773        rbd_dev->parent = parent;
6774        atomic_set(&rbd_dev->parent_ref, 1);
6775        return 0;
6776
6777out_err:
6778        rbd_dev_unparent(rbd_dev);
6779        rbd_dev_destroy(parent);
6780        return ret;
6781}
6782
6783static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6784{
6785        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6786        rbd_free_disk(rbd_dev);
6787        if (!single_major)
6788                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6789}
6790
6791/*
6792 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6793 * upon return.
6794 */
6795static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6796{
6797        int ret;
6798
6799        /* Record our major and minor device numbers. */
6800
6801        if (!single_major) {
6802                ret = register_blkdev(0, rbd_dev->name);
6803                if (ret < 0)
6804                        goto err_out_unlock;
6805
6806                rbd_dev->major = ret;
6807                rbd_dev->minor = 0;
6808        } else {
6809                rbd_dev->major = rbd_major;
6810                rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6811        }
6812
6813        /* Set up the blkdev mapping. */
6814
6815        ret = rbd_init_disk(rbd_dev);
6816        if (ret)
6817                goto err_out_blkdev;
6818
6819        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6820        set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6821
6822        ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6823        if (ret)
6824                goto err_out_disk;
6825
6826        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6827        up_write(&rbd_dev->header_rwsem);
6828        return 0;
6829
6830err_out_disk:
6831        rbd_free_disk(rbd_dev);
6832err_out_blkdev:
6833        if (!single_major)
6834                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6835err_out_unlock:
6836        up_write(&rbd_dev->header_rwsem);
6837        return ret;
6838}
6839
6840static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6841{
6842        struct rbd_spec *spec = rbd_dev->spec;
6843        int ret;
6844
6845        /* Record the header object name for this rbd image. */
6846
6847        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6848        if (rbd_dev->image_format == 1)
6849                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6850                                       spec->image_name, RBD_SUFFIX);
6851        else
6852                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6853                                       RBD_HEADER_PREFIX, spec->image_id);
6854
6855        return ret;
6856}
6857
6858static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6859{
6860        if (!is_snap) {
6861                pr_info("image %s/%s%s%s does not exist\n",
6862                        rbd_dev->spec->pool_name,
6863                        rbd_dev->spec->pool_ns ?: "",
6864                        rbd_dev->spec->pool_ns ? "/" : "",
6865                        rbd_dev->spec->image_name);
6866        } else {
6867                pr_info("snap %s/%s%s%s@%s does not exist\n",
6868                        rbd_dev->spec->pool_name,
6869                        rbd_dev->spec->pool_ns ?: "",
6870                        rbd_dev->spec->pool_ns ? "/" : "",
6871                        rbd_dev->spec->image_name,
6872                        rbd_dev->spec->snap_name);
6873        }
6874}
6875
6876static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6877{
6878        if (!rbd_is_ro(rbd_dev))
6879                rbd_unregister_watch(rbd_dev);
6880
6881        rbd_dev_unprobe(rbd_dev);
6882        rbd_dev->image_format = 0;
6883        kfree(rbd_dev->spec->image_id);
6884        rbd_dev->spec->image_id = NULL;
6885}
6886
6887/*
6888 * Probe for the existence of the header object for the given rbd
6889 * device.  If this image is the one being mapped (i.e., not a
6890 * parent), initiate a watch on its header object before using that
6891 * object to get detailed information about the rbd image.
6892 *
6893 * On success, returns with header_rwsem held for write if called
6894 * with @depth == 0.
6895 */
6896static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6897{
6898        bool need_watch = !rbd_is_ro(rbd_dev);
6899        int ret;
6900
6901        /*
6902         * Get the id from the image id object.  Unless there's an
6903         * error, rbd_dev->spec->image_id will be filled in with
6904         * a dynamically-allocated string, and rbd_dev->image_format
6905         * will be set to either 1 or 2.
6906         */
6907        ret = rbd_dev_image_id(rbd_dev);
6908        if (ret)
6909                return ret;
6910
6911        ret = rbd_dev_header_name(rbd_dev);
6912        if (ret)
6913                goto err_out_format;
6914
6915        if (need_watch) {
6916                ret = rbd_register_watch(rbd_dev);
6917                if (ret) {
6918                        if (ret == -ENOENT)
6919                                rbd_print_dne(rbd_dev, false);
6920                        goto err_out_format;
6921                }
6922        }
6923
6924        if (!depth)
6925                down_write(&rbd_dev->header_rwsem);
6926
6927        ret = rbd_dev_header_info(rbd_dev);
6928        if (ret) {
6929                if (ret == -ENOENT && !need_watch)
6930                        rbd_print_dne(rbd_dev, false);
6931                goto err_out_probe;
6932        }
6933
6934        /*
6935         * If this image is the one being mapped, we have pool name and
6936         * id, image name and id, and snap name - need to fill snap id.
6937         * Otherwise this is a parent image, identified by pool, image
6938         * and snap ids - need to fill in names for those ids.
6939         */
6940        if (!depth)
6941                ret = rbd_spec_fill_snap_id(rbd_dev);
6942        else
6943                ret = rbd_spec_fill_names(rbd_dev);
6944        if (ret) {
6945                if (ret == -ENOENT)
6946                        rbd_print_dne(rbd_dev, true);
6947                goto err_out_probe;
6948        }
6949
6950        ret = rbd_dev_mapping_set(rbd_dev);
6951        if (ret)
6952                goto err_out_probe;
6953
6954        if (rbd_is_snap(rbd_dev) &&
6955            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6956                ret = rbd_object_map_load(rbd_dev);
6957                if (ret)
6958                        goto err_out_probe;
6959        }
6960
6961        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6962                ret = rbd_dev_v2_parent_info(rbd_dev);
6963                if (ret)
6964                        goto err_out_probe;
6965        }
6966
6967        ret = rbd_dev_probe_parent(rbd_dev, depth);
6968        if (ret)
6969                goto err_out_probe;
6970
6971        dout("discovered format %u image, header name is %s\n",
6972                rbd_dev->image_format, rbd_dev->header_oid.name);
6973        return 0;
6974
6975err_out_probe:
6976        if (!depth)
6977                up_write(&rbd_dev->header_rwsem);
6978        if (need_watch)
6979                rbd_unregister_watch(rbd_dev);
6980        rbd_dev_unprobe(rbd_dev);
6981err_out_format:
6982        rbd_dev->image_format = 0;
6983        kfree(rbd_dev->spec->image_id);
6984        rbd_dev->spec->image_id = NULL;
6985        return ret;
6986}
6987
6988static ssize_t do_rbd_add(struct bus_type *bus,
6989                          const char *buf,
6990                          size_t count)
6991{
6992        struct rbd_device *rbd_dev = NULL;
6993        struct ceph_options *ceph_opts = NULL;
6994        struct rbd_options *rbd_opts = NULL;
6995        struct rbd_spec *spec = NULL;
6996        struct rbd_client *rbdc;
6997        int rc;
6998
6999        if (!capable(CAP_SYS_ADMIN))
7000                return -EPERM;
7001
7002        if (!try_module_get(THIS_MODULE))
7003                return -ENODEV;
7004
7005        /* parse add command */
7006        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7007        if (rc < 0)
7008                goto out;
7009
7010        rbdc = rbd_get_client(ceph_opts);
7011        if (IS_ERR(rbdc)) {
7012                rc = PTR_ERR(rbdc);
7013                goto err_out_args;
7014        }
7015
7016        /* pick the pool */
7017        rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7018        if (rc < 0) {
7019                if (rc == -ENOENT)
7020                        pr_info("pool %s does not exist\n", spec->pool_name);
7021                goto err_out_client;
7022        }
7023        spec->pool_id = (u64)rc;
7024
7025        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7026        if (!rbd_dev) {
7027                rc = -ENOMEM;
7028                goto err_out_client;
7029        }
7030        rbdc = NULL;            /* rbd_dev now owns this */
7031        spec = NULL;            /* rbd_dev now owns this */
7032        rbd_opts = NULL;        /* rbd_dev now owns this */
7033
7034        /* if we are mapping a snapshot it will be a read-only mapping */
7035        if (rbd_dev->opts->read_only ||
7036            strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7037                __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7038
7039        rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7040        if (!rbd_dev->config_info) {
7041                rc = -ENOMEM;
7042                goto err_out_rbd_dev;
7043        }
7044
7045        rc = rbd_dev_image_probe(rbd_dev, 0);
7046        if (rc < 0)
7047                goto err_out_rbd_dev;
7048
7049        if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7050                rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7051                         rbd_dev->layout.object_size);
7052                rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7053        }
7054
7055        rc = rbd_dev_device_setup(rbd_dev);
7056        if (rc)
7057                goto err_out_image_probe;
7058
7059        rc = rbd_add_acquire_lock(rbd_dev);
7060        if (rc)
7061                goto err_out_image_lock;
7062
7063        /* Everything's ready.  Announce the disk to the world. */
7064
7065        rc = device_add(&rbd_dev->dev);
7066        if (rc)
7067                goto err_out_image_lock;
7068
7069        device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7070
7071        spin_lock(&rbd_dev_list_lock);
7072        list_add_tail(&rbd_dev->node, &rbd_dev_list);
7073        spin_unlock(&rbd_dev_list_lock);
7074
7075        pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7076                (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7077                rbd_dev->header.features);
7078        rc = count;
7079out:
7080        module_put(THIS_MODULE);
7081        return rc;
7082
7083err_out_image_lock:
7084        rbd_dev_image_unlock(rbd_dev);
7085        rbd_dev_device_release(rbd_dev);
7086err_out_image_probe:
7087        rbd_dev_image_release(rbd_dev);
7088err_out_rbd_dev:
7089        rbd_dev_destroy(rbd_dev);
7090err_out_client:
7091        rbd_put_client(rbdc);
7092err_out_args:
7093        rbd_spec_put(spec);
7094        kfree(rbd_opts);
7095        goto out;
7096}
7097
7098static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7099{
7100        if (single_major)
7101                return -EINVAL;
7102
7103        return do_rbd_add(bus, buf, count);
7104}
7105
7106static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7107                                      size_t count)
7108{
7109        return do_rbd_add(bus, buf, count);
7110}
7111
7112static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7113{
7114        while (rbd_dev->parent) {
7115                struct rbd_device *first = rbd_dev;
7116                struct rbd_device *second = first->parent;
7117                struct rbd_device *third;
7118
7119                /*
7120                 * Follow to the parent with no grandparent and
7121                 * remove it.
7122                 */
7123                while (second && (third = second->parent)) {
7124                        first = second;
7125                        second = third;
7126                }
7127                rbd_assert(second);
7128                rbd_dev_image_release(second);
7129                rbd_dev_destroy(second);
7130                first->parent = NULL;
7131                first->parent_overlap = 0;
7132
7133                rbd_assert(first->parent_spec);
7134                rbd_spec_put(first->parent_spec);
7135                first->parent_spec = NULL;
7136        }
7137}
7138
7139static ssize_t do_rbd_remove(struct bus_type *bus,
7140                             const char *buf,
7141                             size_t count)
7142{
7143        struct rbd_device *rbd_dev = NULL;
7144        struct list_head *tmp;
7145        int dev_id;
7146        char opt_buf[6];
7147        bool force = false;
7148        int ret;
7149
7150        if (!capable(CAP_SYS_ADMIN))
7151                return -EPERM;
7152
7153        dev_id = -1;
7154        opt_buf[0] = '\0';
7155        sscanf(buf, "%d %5s", &dev_id, opt_buf);
7156        if (dev_id < 0) {
7157                pr_err("dev_id out of range\n");
7158                return -EINVAL;
7159        }
7160        if (opt_buf[0] != '\0') {
7161                if (!strcmp(opt_buf, "force")) {
7162                        force = true;
7163                } else {
7164                        pr_err("bad remove option at '%s'\n", opt_buf);
7165                        return -EINVAL;
7166                }
7167        }
7168
7169        ret = -ENOENT;
7170        spin_lock(&rbd_dev_list_lock);
7171        list_for_each(tmp, &rbd_dev_list) {
7172                rbd_dev = list_entry(tmp, struct rbd_device, node);
7173                if (rbd_dev->dev_id == dev_id) {
7174                        ret = 0;
7175                        break;
7176                }
7177        }
7178        if (!ret) {
7179                spin_lock_irq(&rbd_dev->lock);
7180                if (rbd_dev->open_count && !force)
7181                        ret = -EBUSY;
7182                else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7183                                          &rbd_dev->flags))
7184                        ret = -EINPROGRESS;
7185                spin_unlock_irq(&rbd_dev->lock);
7186        }
7187        spin_unlock(&rbd_dev_list_lock);
7188        if (ret)
7189                return ret;
7190
7191        if (force) {
7192                /*
7193                 * Prevent new IO from being queued and wait for existing
7194                 * IO to complete/fail.
7195                 */
7196                blk_mq_freeze_queue(rbd_dev->disk->queue);
7197                blk_set_queue_dying(rbd_dev->disk->queue);
7198        }
7199
7200        del_gendisk(rbd_dev->disk);
7201        spin_lock(&rbd_dev_list_lock);
7202        list_del_init(&rbd_dev->node);
7203        spin_unlock(&rbd_dev_list_lock);
7204        device_del(&rbd_dev->dev);
7205
7206        rbd_dev_image_unlock(rbd_dev);
7207        rbd_dev_device_release(rbd_dev);
7208        rbd_dev_image_release(rbd_dev);
7209        rbd_dev_destroy(rbd_dev);
7210        return count;
7211}
7212
7213static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7214{
7215        if (single_major)
7216                return -EINVAL;
7217
7218        return do_rbd_remove(bus, buf, count);
7219}
7220
7221static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7222                                         size_t count)
7223{
7224        return do_rbd_remove(bus, buf, count);
7225}
7226
7227/*
7228 * create control files in sysfs
7229 * /sys/bus/rbd/...
7230 */
7231static int __init rbd_sysfs_init(void)
7232{
7233        int ret;
7234
7235        ret = device_register(&rbd_root_dev);
7236        if (ret < 0)
7237                return ret;
7238
7239        ret = bus_register(&rbd_bus_type);
7240        if (ret < 0)
7241                device_unregister(&rbd_root_dev);
7242
7243        return ret;
7244}
7245
7246static void __exit rbd_sysfs_cleanup(void)
7247{
7248        bus_unregister(&rbd_bus_type);
7249        device_unregister(&rbd_root_dev);
7250}
7251
7252static int __init rbd_slab_init(void)
7253{
7254        rbd_assert(!rbd_img_request_cache);
7255        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7256        if (!rbd_img_request_cache)
7257                return -ENOMEM;
7258
7259        rbd_assert(!rbd_obj_request_cache);
7260        rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7261        if (!rbd_obj_request_cache)
7262                goto out_err;
7263
7264        return 0;
7265
7266out_err:
7267        kmem_cache_destroy(rbd_img_request_cache);
7268        rbd_img_request_cache = NULL;
7269        return -ENOMEM;
7270}
7271
7272static void rbd_slab_exit(void)
7273{
7274        rbd_assert(rbd_obj_request_cache);
7275        kmem_cache_destroy(rbd_obj_request_cache);
7276        rbd_obj_request_cache = NULL;
7277
7278        rbd_assert(rbd_img_request_cache);
7279        kmem_cache_destroy(rbd_img_request_cache);
7280        rbd_img_request_cache = NULL;
7281}
7282
7283static int __init rbd_init(void)
7284{
7285        int rc;
7286
7287        if (!libceph_compatible(NULL)) {
7288                rbd_warn(NULL, "libceph incompatibility (quitting)");
7289                return -EINVAL;
7290        }
7291
7292        rc = rbd_slab_init();
7293        if (rc)
7294                return rc;
7295
7296        /*
7297         * The number of active work items is limited by the number of
7298         * rbd devices * queue depth, so leave @max_active at default.
7299         */
7300        rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7301        if (!rbd_wq) {
7302                rc = -ENOMEM;
7303                goto err_out_slab;
7304        }
7305
7306        if (single_major) {
7307                rbd_major = register_blkdev(0, RBD_DRV_NAME);
7308                if (rbd_major < 0) {
7309                        rc = rbd_major;
7310                        goto err_out_wq;
7311                }
7312        }
7313
7314        rc = rbd_sysfs_init();
7315        if (rc)
7316                goto err_out_blkdev;
7317
7318        if (single_major)
7319                pr_info("loaded (major %d)\n", rbd_major);
7320        else
7321                pr_info("loaded\n");
7322
7323        return 0;
7324
7325err_out_blkdev:
7326        if (single_major)
7327                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7328err_out_wq:
7329        destroy_workqueue(rbd_wq);
7330err_out_slab:
7331        rbd_slab_exit();
7332        return rc;
7333}
7334
7335static void __exit rbd_exit(void)
7336{
7337        ida_destroy(&rbd_dev_id_ida);
7338        rbd_sysfs_cleanup();
7339        if (single_major)
7340                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7341        destroy_workqueue(rbd_wq);
7342        rbd_slab_exit();
7343}
7344
7345module_init(rbd_init);
7346module_exit(rbd_exit);
7347
7348MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7349MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7350MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7351/* following authorship retained from original osdblk.c */
7352MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7353
7354MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7355MODULE_LICENSE("GPL");
7356