linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/striper.h>
  36#include <linux/ceph/decode.h>
  37#include <linux/fs_parser.h>
  38#include <linux/bsearch.h>
  39
  40#include <linux/kernel.h>
  41#include <linux/device.h>
  42#include <linux/module.h>
  43#include <linux/blk-mq.h>
  44#include <linux/fs.h>
  45#include <linux/blkdev.h>
  46#include <linux/slab.h>
  47#include <linux/idr.h>
  48#include <linux/workqueue.h>
  49
  50#include "rbd_types.h"
  51
  52#define RBD_DEBUG       /* Activate rbd_assert() calls */
  53
  54/*
  55 * Increment the given counter and return its updated value.
  56 * If the counter is already 0 it will not be incremented.
  57 * If the counter is already at its maximum value returns
  58 * -EINVAL without updating it.
  59 */
  60static int atomic_inc_return_safe(atomic_t *v)
  61{
  62        unsigned int counter;
  63
  64        counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
  65        if (counter <= (unsigned int)INT_MAX)
  66                return (int)counter;
  67
  68        atomic_dec(v);
  69
  70        return -EINVAL;
  71}
  72
  73/* Decrement the counter.  Return the resulting value, or -EINVAL */
  74static int atomic_dec_return_safe(atomic_t *v)
  75{
  76        int counter;
  77
  78        counter = atomic_dec_return(v);
  79        if (counter >= 0)
  80                return counter;
  81
  82        atomic_inc(v);
  83
  84        return -EINVAL;
  85}
  86
  87#define RBD_DRV_NAME "rbd"
  88
  89#define RBD_MINORS_PER_MAJOR            256
  90#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  91
  92#define RBD_MAX_PARENT_CHAIN_LEN        16
  93
  94#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  95#define RBD_MAX_SNAP_NAME_LEN   \
  96                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  97
  98#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
  99
 100#define RBD_SNAP_HEAD_NAME      "-"
 101
 102#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 103
 104/* This allows a single page to hold an image name sent by OSD */
 105#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 106#define RBD_IMAGE_ID_LEN_MAX    64
 107
 108#define RBD_OBJ_PREFIX_LEN_MAX  64
 109
 110#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 111#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 112
 113/* Feature bits */
 114
 115#define RBD_FEATURE_LAYERING            (1ULL<<0)
 116#define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
 117#define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
 118#define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
 119#define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
 120#define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
 121#define RBD_FEATURE_DATA_POOL           (1ULL<<7)
 122#define RBD_FEATURE_OPERATIONS          (1ULL<<8)
 123
 124#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 125                                 RBD_FEATURE_STRIPINGV2 |       \
 126                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
 127                                 RBD_FEATURE_OBJECT_MAP |       \
 128                                 RBD_FEATURE_FAST_DIFF |        \
 129                                 RBD_FEATURE_DEEP_FLATTEN |     \
 130                                 RBD_FEATURE_DATA_POOL |        \
 131                                 RBD_FEATURE_OPERATIONS)
 132
 133/* Features supported by this (client software) implementation. */
 134
 135#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 136
 137/*
 138 * An RBD device name will be "rbd#", where the "rbd" comes from
 139 * RBD_DRV_NAME above, and # is a unique integer identifier.
 140 */
 141#define DEV_NAME_LEN            32
 142
 143/*
 144 * block device image metadata (in-memory version)
 145 */
 146struct rbd_image_header {
 147        /* These six fields never change for a given rbd image */
 148        char *object_prefix;
 149        __u8 obj_order;
 150        u64 stripe_unit;
 151        u64 stripe_count;
 152        s64 data_pool_id;
 153        u64 features;           /* Might be changeable someday? */
 154
 155        /* The remaining fields need to be updated occasionally */
 156        u64 image_size;
 157        struct ceph_snap_context *snapc;
 158        char *snap_names;       /* format 1 only */
 159        u64 *snap_sizes;        /* format 1 only */
 160};
 161
 162/*
 163 * An rbd image specification.
 164 *
 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 166 * identify an image.  Each rbd_dev structure includes a pointer to
 167 * an rbd_spec structure that encapsulates this identity.
 168 *
 169 * Each of the id's in an rbd_spec has an associated name.  For a
 170 * user-mapped image, the names are supplied and the id's associated
 171 * with them are looked up.  For a layered image, a parent image is
 172 * defined by the tuple, and the names are looked up.
 173 *
 174 * An rbd_dev structure contains a parent_spec pointer which is
 175 * non-null if the image it represents is a child in a layered
 176 * image.  This pointer will refer to the rbd_spec structure used
 177 * by the parent rbd_dev for its own identity (i.e., the structure
 178 * is shared between the parent and child).
 179 *
 180 * Since these structures are populated once, during the discovery
 181 * phase of image construction, they are effectively immutable so
 182 * we make no effort to synchronize access to them.
 183 *
 184 * Note that code herein does not assume the image name is known (it
 185 * could be a null pointer).
 186 */
 187struct rbd_spec {
 188        u64             pool_id;
 189        const char      *pool_name;
 190        const char      *pool_ns;       /* NULL if default, never "" */
 191
 192        const char      *image_id;
 193        const char      *image_name;
 194
 195        u64             snap_id;
 196        const char      *snap_name;
 197
 198        struct kref     kref;
 199};
 200
 201/*
 202 * an instance of the client.  multiple devices may share an rbd client.
 203 */
 204struct rbd_client {
 205        struct ceph_client      *client;
 206        struct kref             kref;
 207        struct list_head        node;
 208};
 209
 210struct pending_result {
 211        int                     result;         /* first nonzero result */
 212        int                     num_pending;
 213};
 214
 215struct rbd_img_request;
 216
 217enum obj_request_type {
 218        OBJ_REQUEST_NODATA = 1,
 219        OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
 220        OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
 221        OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 222};
 223
 224enum obj_operation_type {
 225        OBJ_OP_READ = 1,
 226        OBJ_OP_WRITE,
 227        OBJ_OP_DISCARD,
 228        OBJ_OP_ZEROOUT,
 229};
 230
 231#define RBD_OBJ_FLAG_DELETION                   (1U << 0)
 232#define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
 233#define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
 234#define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
 235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
 236
 237enum rbd_obj_read_state {
 238        RBD_OBJ_READ_START = 1,
 239        RBD_OBJ_READ_OBJECT,
 240        RBD_OBJ_READ_PARENT,
 241};
 242
 243/*
 244 * Writes go through the following state machine to deal with
 245 * layering:
 246 *
 247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
 248 *            .                 |                                    .
 249 *            .                 v                                    .
 250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
 251 *            .                 |                    .               .
 252 *            .                 v                    v (deep-copyup  .
 253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
 254 * flattened) v                 |                    .               .
 255 *            .                 v                    .               .
 256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
 257 *                              |                        not needed) v
 258 *                              v                                    .
 259 *                            done . . . . . . . . . . . . . . . . . .
 260 *                              ^
 261 *                              |
 262 *                     RBD_OBJ_WRITE_FLAT
 263 *
 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 265 * assert_exists guard is needed or not (in some cases it's not needed
 266 * even if there is a parent).
 267 */
 268enum rbd_obj_write_state {
 269        RBD_OBJ_WRITE_START = 1,
 270        RBD_OBJ_WRITE_PRE_OBJECT_MAP,
 271        RBD_OBJ_WRITE_OBJECT,
 272        __RBD_OBJ_WRITE_COPYUP,
 273        RBD_OBJ_WRITE_COPYUP,
 274        RBD_OBJ_WRITE_POST_OBJECT_MAP,
 275};
 276
 277enum rbd_obj_copyup_state {
 278        RBD_OBJ_COPYUP_START = 1,
 279        RBD_OBJ_COPYUP_READ_PARENT,
 280        __RBD_OBJ_COPYUP_OBJECT_MAPS,
 281        RBD_OBJ_COPYUP_OBJECT_MAPS,
 282        __RBD_OBJ_COPYUP_WRITE_OBJECT,
 283        RBD_OBJ_COPYUP_WRITE_OBJECT,
 284};
 285
 286struct rbd_obj_request {
 287        struct ceph_object_extent ex;
 288        unsigned int            flags;  /* RBD_OBJ_FLAG_* */
 289        union {
 290                enum rbd_obj_read_state  read_state;    /* for reads */
 291                enum rbd_obj_write_state write_state;   /* for writes */
 292        };
 293
 294        struct rbd_img_request  *img_request;
 295        struct ceph_file_extent *img_extents;
 296        u32                     num_img_extents;
 297
 298        union {
 299                struct ceph_bio_iter    bio_pos;
 300                struct {
 301                        struct ceph_bvec_iter   bvec_pos;
 302                        u32                     bvec_count;
 303                        u32                     bvec_idx;
 304                };
 305        };
 306
 307        enum rbd_obj_copyup_state copyup_state;
 308        struct bio_vec          *copyup_bvecs;
 309        u32                     copyup_bvec_count;
 310
 311        struct list_head        osd_reqs;       /* w/ r_private_item */
 312
 313        struct mutex            state_mutex;
 314        struct pending_result   pending;
 315        struct kref             kref;
 316};
 317
 318enum img_req_flags {
 319        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 320        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 321};
 322
 323enum rbd_img_state {
 324        RBD_IMG_START = 1,
 325        RBD_IMG_EXCLUSIVE_LOCK,
 326        __RBD_IMG_OBJECT_REQUESTS,
 327        RBD_IMG_OBJECT_REQUESTS,
 328};
 329
 330struct rbd_img_request {
 331        struct rbd_device       *rbd_dev;
 332        enum obj_operation_type op_type;
 333        enum obj_request_type   data_type;
 334        unsigned long           flags;
 335        enum rbd_img_state      state;
 336        union {
 337                u64                     snap_id;        /* for reads */
 338                struct ceph_snap_context *snapc;        /* for writes */
 339        };
 340        struct rbd_obj_request  *obj_request;   /* obj req initiator */
 341
 342        struct list_head        lock_item;
 343        struct list_head        object_extents; /* obj_req.ex structs */
 344
 345        struct mutex            state_mutex;
 346        struct pending_result   pending;
 347        struct work_struct      work;
 348        int                     work_result;
 349};
 350
 351#define for_each_obj_request(ireq, oreq) \
 352        list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 353#define for_each_obj_request_safe(ireq, oreq, n) \
 354        list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 355
 356enum rbd_watch_state {
 357        RBD_WATCH_STATE_UNREGISTERED,
 358        RBD_WATCH_STATE_REGISTERED,
 359        RBD_WATCH_STATE_ERROR,
 360};
 361
 362enum rbd_lock_state {
 363        RBD_LOCK_STATE_UNLOCKED,
 364        RBD_LOCK_STATE_LOCKED,
 365        RBD_LOCK_STATE_RELEASING,
 366};
 367
 368/* WatchNotify::ClientId */
 369struct rbd_client_id {
 370        u64 gid;
 371        u64 handle;
 372};
 373
 374struct rbd_mapping {
 375        u64                     size;
 376};
 377
 378/*
 379 * a single device
 380 */
 381struct rbd_device {
 382        int                     dev_id;         /* blkdev unique id */
 383
 384        int                     major;          /* blkdev assigned major */
 385        int                     minor;
 386        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 387
 388        u32                     image_format;   /* Either 1 or 2 */
 389        struct rbd_client       *rbd_client;
 390
 391        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 392
 393        spinlock_t              lock;           /* queue, flags, open_count */
 394
 395        struct rbd_image_header header;
 396        unsigned long           flags;          /* possibly lock protected */
 397        struct rbd_spec         *spec;
 398        struct rbd_options      *opts;
 399        char                    *config_info;   /* add{,_single_major} string */
 400
 401        struct ceph_object_id   header_oid;
 402        struct ceph_object_locator header_oloc;
 403
 404        struct ceph_file_layout layout;         /* used for all rbd requests */
 405
 406        struct mutex            watch_mutex;
 407        enum rbd_watch_state    watch_state;
 408        struct ceph_osd_linger_request *watch_handle;
 409        u64                     watch_cookie;
 410        struct delayed_work     watch_dwork;
 411
 412        struct rw_semaphore     lock_rwsem;
 413        enum rbd_lock_state     lock_state;
 414        char                    lock_cookie[32];
 415        struct rbd_client_id    owner_cid;
 416        struct work_struct      acquired_lock_work;
 417        struct work_struct      released_lock_work;
 418        struct delayed_work     lock_dwork;
 419        struct work_struct      unlock_work;
 420        spinlock_t              lock_lists_lock;
 421        struct list_head        acquiring_list;
 422        struct list_head        running_list;
 423        struct completion       acquire_wait;
 424        int                     acquire_err;
 425        struct completion       releasing_wait;
 426
 427        spinlock_t              object_map_lock;
 428        u8                      *object_map;
 429        u64                     object_map_size;        /* in objects */
 430        u64                     object_map_flags;
 431
 432        struct workqueue_struct *task_wq;
 433
 434        struct rbd_spec         *parent_spec;
 435        u64                     parent_overlap;
 436        atomic_t                parent_ref;
 437        struct rbd_device       *parent;
 438
 439        /* Block layer tags. */
 440        struct blk_mq_tag_set   tag_set;
 441
 442        /* protects updating the header */
 443        struct rw_semaphore     header_rwsem;
 444
 445        struct rbd_mapping      mapping;
 446
 447        struct list_head        node;
 448
 449        /* sysfs related */
 450        struct device           dev;
 451        unsigned long           open_count;     /* protected by lock */
 452};
 453
 454/*
 455 * Flag bits for rbd_dev->flags:
 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 457 *   by rbd_dev->lock
 458 */
 459enum rbd_dev_flags {
 460        RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
 461        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 462        RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 463};
 464
 465static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 466
 467static LIST_HEAD(rbd_dev_list);    /* devices */
 468static DEFINE_SPINLOCK(rbd_dev_list_lock);
 469
 470static LIST_HEAD(rbd_client_list);              /* clients */
 471static DEFINE_SPINLOCK(rbd_client_list_lock);
 472
 473/* Slab caches for frequently-allocated structures */
 474
 475static struct kmem_cache        *rbd_img_request_cache;
 476static struct kmem_cache        *rbd_obj_request_cache;
 477
 478static int rbd_major;
 479static DEFINE_IDA(rbd_dev_id_ida);
 480
 481static struct workqueue_struct *rbd_wq;
 482
 483static struct ceph_snap_context rbd_empty_snapc = {
 484        .nref = REFCOUNT_INIT(1),
 485};
 486
 487/*
 488 * single-major requires >= 0.75 version of userspace rbd utility.
 489 */
 490static bool single_major = true;
 491module_param(single_major, bool, 0444);
 492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 493
 494static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
 495static ssize_t remove_store(struct bus_type *bus, const char *buf,
 496                            size_t count);
 497static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 498                                      size_t count);
 499static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 500                                         size_t count);
 501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 502
 503static int rbd_dev_id_to_minor(int dev_id)
 504{
 505        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 506}
 507
 508static int minor_to_rbd_dev_id(int minor)
 509{
 510        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 511}
 512
 513static bool rbd_is_ro(struct rbd_device *rbd_dev)
 514{
 515        return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
 516}
 517
 518static bool rbd_is_snap(struct rbd_device *rbd_dev)
 519{
 520        return rbd_dev->spec->snap_id != CEPH_NOSNAP;
 521}
 522
 523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 524{
 525        lockdep_assert_held(&rbd_dev->lock_rwsem);
 526
 527        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 528               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 529}
 530
 531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 532{
 533        bool is_lock_owner;
 534
 535        down_read(&rbd_dev->lock_rwsem);
 536        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 537        up_read(&rbd_dev->lock_rwsem);
 538        return is_lock_owner;
 539}
 540
 541static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 542{
 543        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 544}
 545
 546static BUS_ATTR_WO(add);
 547static BUS_ATTR_WO(remove);
 548static BUS_ATTR_WO(add_single_major);
 549static BUS_ATTR_WO(remove_single_major);
 550static BUS_ATTR_RO(supported_features);
 551
 552static struct attribute *rbd_bus_attrs[] = {
 553        &bus_attr_add.attr,
 554        &bus_attr_remove.attr,
 555        &bus_attr_add_single_major.attr,
 556        &bus_attr_remove_single_major.attr,
 557        &bus_attr_supported_features.attr,
 558        NULL,
 559};
 560
 561static umode_t rbd_bus_is_visible(struct kobject *kobj,
 562                                  struct attribute *attr, int index)
 563{
 564        if (!single_major &&
 565            (attr == &bus_attr_add_single_major.attr ||
 566             attr == &bus_attr_remove_single_major.attr))
 567                return 0;
 568
 569        return attr->mode;
 570}
 571
 572static const struct attribute_group rbd_bus_group = {
 573        .attrs = rbd_bus_attrs,
 574        .is_visible = rbd_bus_is_visible,
 575};
 576__ATTRIBUTE_GROUPS(rbd_bus);
 577
 578static struct bus_type rbd_bus_type = {
 579        .name           = "rbd",
 580        .bus_groups     = rbd_bus_groups,
 581};
 582
 583static void rbd_root_dev_release(struct device *dev)
 584{
 585}
 586
 587static struct device rbd_root_dev = {
 588        .init_name =    "rbd",
 589        .release =      rbd_root_dev_release,
 590};
 591
 592static __printf(2, 3)
 593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 594{
 595        struct va_format vaf;
 596        va_list args;
 597
 598        va_start(args, fmt);
 599        vaf.fmt = fmt;
 600        vaf.va = &args;
 601
 602        if (!rbd_dev)
 603                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 604        else if (rbd_dev->disk)
 605                printk(KERN_WARNING "%s: %s: %pV\n",
 606                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 607        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 608                printk(KERN_WARNING "%s: image %s: %pV\n",
 609                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 610        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 611                printk(KERN_WARNING "%s: id %s: %pV\n",
 612                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 613        else    /* punt */
 614                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 615                        RBD_DRV_NAME, rbd_dev, &vaf);
 616        va_end(args);
 617}
 618
 619#ifdef RBD_DEBUG
 620#define rbd_assert(expr)                                                \
 621                if (unlikely(!(expr))) {                                \
 622                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 623                                                "at line %d:\n\n"       \
 624                                        "\trbd_assert(%s);\n\n",        \
 625                                        __func__, __LINE__, #expr);     \
 626                        BUG();                                          \
 627                }
 628#else /* !RBD_DEBUG */
 629#  define rbd_assert(expr)      ((void) 0)
 630#endif /* !RBD_DEBUG */
 631
 632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 633
 634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 636static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 637static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 638static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 639                                        u64 snap_id);
 640static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 641                                u8 *order, u64 *snap_size);
 642static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
 643
 644static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
 645static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
 646
 647/*
 648 * Return true if nothing else is pending.
 649 */
 650static bool pending_result_dec(struct pending_result *pending, int *result)
 651{
 652        rbd_assert(pending->num_pending > 0);
 653
 654        if (*result && !pending->result)
 655                pending->result = *result;
 656        if (--pending->num_pending)
 657                return false;
 658
 659        *result = pending->result;
 660        return true;
 661}
 662
 663static int rbd_open(struct block_device *bdev, fmode_t mode)
 664{
 665        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 666        bool removing = false;
 667
 668        spin_lock_irq(&rbd_dev->lock);
 669        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 670                removing = true;
 671        else
 672                rbd_dev->open_count++;
 673        spin_unlock_irq(&rbd_dev->lock);
 674        if (removing)
 675                return -ENOENT;
 676
 677        (void) get_device(&rbd_dev->dev);
 678
 679        return 0;
 680}
 681
 682static void rbd_release(struct gendisk *disk, fmode_t mode)
 683{
 684        struct rbd_device *rbd_dev = disk->private_data;
 685        unsigned long open_count_before;
 686
 687        spin_lock_irq(&rbd_dev->lock);
 688        open_count_before = rbd_dev->open_count--;
 689        spin_unlock_irq(&rbd_dev->lock);
 690        rbd_assert(open_count_before > 0);
 691
 692        put_device(&rbd_dev->dev);
 693}
 694
 695static const struct block_device_operations rbd_bd_ops = {
 696        .owner                  = THIS_MODULE,
 697        .open                   = rbd_open,
 698        .release                = rbd_release,
 699};
 700
 701/*
 702 * Initialize an rbd client instance.  Success or not, this function
 703 * consumes ceph_opts.  Caller holds client_mutex.
 704 */
 705static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 706{
 707        struct rbd_client *rbdc;
 708        int ret = -ENOMEM;
 709
 710        dout("%s:\n", __func__);
 711        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 712        if (!rbdc)
 713                goto out_opt;
 714
 715        kref_init(&rbdc->kref);
 716        INIT_LIST_HEAD(&rbdc->node);
 717
 718        rbdc->client = ceph_create_client(ceph_opts, rbdc);
 719        if (IS_ERR(rbdc->client))
 720                goto out_rbdc;
 721        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 722
 723        ret = ceph_open_session(rbdc->client);
 724        if (ret < 0)
 725                goto out_client;
 726
 727        spin_lock(&rbd_client_list_lock);
 728        list_add_tail(&rbdc->node, &rbd_client_list);
 729        spin_unlock(&rbd_client_list_lock);
 730
 731        dout("%s: rbdc %p\n", __func__, rbdc);
 732
 733        return rbdc;
 734out_client:
 735        ceph_destroy_client(rbdc->client);
 736out_rbdc:
 737        kfree(rbdc);
 738out_opt:
 739        if (ceph_opts)
 740                ceph_destroy_options(ceph_opts);
 741        dout("%s: error %d\n", __func__, ret);
 742
 743        return ERR_PTR(ret);
 744}
 745
 746static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 747{
 748        kref_get(&rbdc->kref);
 749
 750        return rbdc;
 751}
 752
 753/*
 754 * Find a ceph client with specific addr and configuration.  If
 755 * found, bump its reference count.
 756 */
 757static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 758{
 759        struct rbd_client *client_node;
 760        bool found = false;
 761
 762        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 763                return NULL;
 764
 765        spin_lock(&rbd_client_list_lock);
 766        list_for_each_entry(client_node, &rbd_client_list, node) {
 767                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 768                        __rbd_get_client(client_node);
 769
 770                        found = true;
 771                        break;
 772                }
 773        }
 774        spin_unlock(&rbd_client_list_lock);
 775
 776        return found ? client_node : NULL;
 777}
 778
 779/*
 780 * (Per device) rbd map options
 781 */
 782enum {
 783        Opt_queue_depth,
 784        Opt_alloc_size,
 785        Opt_lock_timeout,
 786        /* int args above */
 787        Opt_pool_ns,
 788        Opt_compression_hint,
 789        /* string args above */
 790        Opt_read_only,
 791        Opt_read_write,
 792        Opt_lock_on_read,
 793        Opt_exclusive,
 794        Opt_notrim,
 795};
 796
 797enum {
 798        Opt_compression_hint_none,
 799        Opt_compression_hint_compressible,
 800        Opt_compression_hint_incompressible,
 801};
 802
 803static const struct constant_table rbd_param_compression_hint[] = {
 804        {"none",                Opt_compression_hint_none},
 805        {"compressible",        Opt_compression_hint_compressible},
 806        {"incompressible",      Opt_compression_hint_incompressible},
 807        {}
 808};
 809
 810static const struct fs_parameter_spec rbd_parameters[] = {
 811        fsparam_u32     ("alloc_size",                  Opt_alloc_size),
 812        fsparam_enum    ("compression_hint",            Opt_compression_hint,
 813                         rbd_param_compression_hint),
 814        fsparam_flag    ("exclusive",                   Opt_exclusive),
 815        fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
 816        fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
 817        fsparam_flag    ("notrim",                      Opt_notrim),
 818        fsparam_string  ("_pool_ns",                    Opt_pool_ns),
 819        fsparam_u32     ("queue_depth",                 Opt_queue_depth),
 820        fsparam_flag    ("read_only",                   Opt_read_only),
 821        fsparam_flag    ("read_write",                  Opt_read_write),
 822        fsparam_flag    ("ro",                          Opt_read_only),
 823        fsparam_flag    ("rw",                          Opt_read_write),
 824        {}
 825};
 826
 827struct rbd_options {
 828        int     queue_depth;
 829        int     alloc_size;
 830        unsigned long   lock_timeout;
 831        bool    read_only;
 832        bool    lock_on_read;
 833        bool    exclusive;
 834        bool    trim;
 835
 836        u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 837};
 838
 839#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 840#define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
 841#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 842#define RBD_READ_ONLY_DEFAULT   false
 843#define RBD_LOCK_ON_READ_DEFAULT false
 844#define RBD_EXCLUSIVE_DEFAULT   false
 845#define RBD_TRIM_DEFAULT        true
 846
 847struct rbd_parse_opts_ctx {
 848        struct rbd_spec         *spec;
 849        struct ceph_options     *copts;
 850        struct rbd_options      *opts;
 851};
 852
 853static char* obj_op_name(enum obj_operation_type op_type)
 854{
 855        switch (op_type) {
 856        case OBJ_OP_READ:
 857                return "read";
 858        case OBJ_OP_WRITE:
 859                return "write";
 860        case OBJ_OP_DISCARD:
 861                return "discard";
 862        case OBJ_OP_ZEROOUT:
 863                return "zeroout";
 864        default:
 865                return "???";
 866        }
 867}
 868
 869/*
 870 * Destroy ceph client
 871 *
 872 * Caller must hold rbd_client_list_lock.
 873 */
 874static void rbd_client_release(struct kref *kref)
 875{
 876        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 877
 878        dout("%s: rbdc %p\n", __func__, rbdc);
 879        spin_lock(&rbd_client_list_lock);
 880        list_del(&rbdc->node);
 881        spin_unlock(&rbd_client_list_lock);
 882
 883        ceph_destroy_client(rbdc->client);
 884        kfree(rbdc);
 885}
 886
 887/*
 888 * Drop reference to ceph client node. If it's not referenced anymore, release
 889 * it.
 890 */
 891static void rbd_put_client(struct rbd_client *rbdc)
 892{
 893        if (rbdc)
 894                kref_put(&rbdc->kref, rbd_client_release);
 895}
 896
 897/*
 898 * Get a ceph client with specific addr and configuration, if one does
 899 * not exist create it.  Either way, ceph_opts is consumed by this
 900 * function.
 901 */
 902static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 903{
 904        struct rbd_client *rbdc;
 905        int ret;
 906
 907        mutex_lock(&client_mutex);
 908        rbdc = rbd_client_find(ceph_opts);
 909        if (rbdc) {
 910                ceph_destroy_options(ceph_opts);
 911
 912                /*
 913                 * Using an existing client.  Make sure ->pg_pools is up to
 914                 * date before we look up the pool id in do_rbd_add().
 915                 */
 916                ret = ceph_wait_for_latest_osdmap(rbdc->client,
 917                                        rbdc->client->options->mount_timeout);
 918                if (ret) {
 919                        rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
 920                        rbd_put_client(rbdc);
 921                        rbdc = ERR_PTR(ret);
 922                }
 923        } else {
 924                rbdc = rbd_client_create(ceph_opts);
 925        }
 926        mutex_unlock(&client_mutex);
 927
 928        return rbdc;
 929}
 930
 931static bool rbd_image_format_valid(u32 image_format)
 932{
 933        return image_format == 1 || image_format == 2;
 934}
 935
 936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 937{
 938        size_t size;
 939        u32 snap_count;
 940
 941        /* The header has to start with the magic rbd header text */
 942        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 943                return false;
 944
 945        /* The bio layer requires at least sector-sized I/O */
 946
 947        if (ondisk->options.order < SECTOR_SHIFT)
 948                return false;
 949
 950        /* If we use u64 in a few spots we may be able to loosen this */
 951
 952        if (ondisk->options.order > 8 * sizeof (int) - 1)
 953                return false;
 954
 955        /*
 956         * The size of a snapshot header has to fit in a size_t, and
 957         * that limits the number of snapshots.
 958         */
 959        snap_count = le32_to_cpu(ondisk->snap_count);
 960        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 961        if (snap_count > size / sizeof (__le64))
 962                return false;
 963
 964        /*
 965         * Not only that, but the size of the entire the snapshot
 966         * header must also be representable in a size_t.
 967         */
 968        size -= snap_count * sizeof (__le64);
 969        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 970                return false;
 971
 972        return true;
 973}
 974
 975/*
 976 * returns the size of an object in the image
 977 */
 978static u32 rbd_obj_bytes(struct rbd_image_header *header)
 979{
 980        return 1U << header->obj_order;
 981}
 982
 983static void rbd_init_layout(struct rbd_device *rbd_dev)
 984{
 985        if (rbd_dev->header.stripe_unit == 0 ||
 986            rbd_dev->header.stripe_count == 0) {
 987                rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
 988                rbd_dev->header.stripe_count = 1;
 989        }
 990
 991        rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
 992        rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
 993        rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
 994        rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
 995                          rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
 996        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 997}
 998
 999/*
1000 * Fill an rbd image header with information from the given format 1
1001 * on-disk header.
1002 */
1003static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1004                                 struct rbd_image_header_ondisk *ondisk)
1005{
1006        struct rbd_image_header *header = &rbd_dev->header;
1007        bool first_time = header->object_prefix == NULL;
1008        struct ceph_snap_context *snapc;
1009        char *object_prefix = NULL;
1010        char *snap_names = NULL;
1011        u64 *snap_sizes = NULL;
1012        u32 snap_count;
1013        int ret = -ENOMEM;
1014        u32 i;
1015
1016        /* Allocate this now to avoid having to handle failure below */
1017
1018        if (first_time) {
1019                object_prefix = kstrndup(ondisk->object_prefix,
1020                                         sizeof(ondisk->object_prefix),
1021                                         GFP_KERNEL);
1022                if (!object_prefix)
1023                        return -ENOMEM;
1024        }
1025
1026        /* Allocate the snapshot context and fill it in */
1027
1028        snap_count = le32_to_cpu(ondisk->snap_count);
1029        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1030        if (!snapc)
1031                goto out_err;
1032        snapc->seq = le64_to_cpu(ondisk->snap_seq);
1033        if (snap_count) {
1034                struct rbd_image_snap_ondisk *snaps;
1035                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1036
1037                /* We'll keep a copy of the snapshot names... */
1038
1039                if (snap_names_len > (u64)SIZE_MAX)
1040                        goto out_2big;
1041                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1042                if (!snap_names)
1043                        goto out_err;
1044
1045                /* ...as well as the array of their sizes. */
1046                snap_sizes = kmalloc_array(snap_count,
1047                                           sizeof(*header->snap_sizes),
1048                                           GFP_KERNEL);
1049                if (!snap_sizes)
1050                        goto out_err;
1051
1052                /*
1053                 * Copy the names, and fill in each snapshot's id
1054                 * and size.
1055                 *
1056                 * Note that rbd_dev_v1_header_info() guarantees the
1057                 * ondisk buffer we're working with has
1058                 * snap_names_len bytes beyond the end of the
1059                 * snapshot id array, this memcpy() is safe.
1060                 */
1061                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1062                snaps = ondisk->snaps;
1063                for (i = 0; i < snap_count; i++) {
1064                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1065                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1066                }
1067        }
1068
1069        /* We won't fail any more, fill in the header */
1070
1071        if (first_time) {
1072                header->object_prefix = object_prefix;
1073                header->obj_order = ondisk->options.order;
1074                rbd_init_layout(rbd_dev);
1075        } else {
1076                ceph_put_snap_context(header->snapc);
1077                kfree(header->snap_names);
1078                kfree(header->snap_sizes);
1079        }
1080
1081        /* The remaining fields always get updated (when we refresh) */
1082
1083        header->image_size = le64_to_cpu(ondisk->image_size);
1084        header->snapc = snapc;
1085        header->snap_names = snap_names;
1086        header->snap_sizes = snap_sizes;
1087
1088        return 0;
1089out_2big:
1090        ret = -EIO;
1091out_err:
1092        kfree(snap_sizes);
1093        kfree(snap_names);
1094        ceph_put_snap_context(snapc);
1095        kfree(object_prefix);
1096
1097        return ret;
1098}
1099
1100static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1101{
1102        const char *snap_name;
1103
1104        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1105
1106        /* Skip over names until we find the one we are looking for */
1107
1108        snap_name = rbd_dev->header.snap_names;
1109        while (which--)
1110                snap_name += strlen(snap_name) + 1;
1111
1112        return kstrdup(snap_name, GFP_KERNEL);
1113}
1114
1115/*
1116 * Snapshot id comparison function for use with qsort()/bsearch().
1117 * Note that result is for snapshots in *descending* order.
1118 */
1119static int snapid_compare_reverse(const void *s1, const void *s2)
1120{
1121        u64 snap_id1 = *(u64 *)s1;
1122        u64 snap_id2 = *(u64 *)s2;
1123
1124        if (snap_id1 < snap_id2)
1125                return 1;
1126        return snap_id1 == snap_id2 ? 0 : -1;
1127}
1128
1129/*
1130 * Search a snapshot context to see if the given snapshot id is
1131 * present.
1132 *
1133 * Returns the position of the snapshot id in the array if it's found,
1134 * or BAD_SNAP_INDEX otherwise.
1135 *
1136 * Note: The snapshot array is in kept sorted (by the osd) in
1137 * reverse order, highest snapshot id first.
1138 */
1139static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1140{
1141        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1142        u64 *found;
1143
1144        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1145                                sizeof (snap_id), snapid_compare_reverse);
1146
1147        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1148}
1149
1150static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1151                                        u64 snap_id)
1152{
1153        u32 which;
1154        const char *snap_name;
1155
1156        which = rbd_dev_snap_index(rbd_dev, snap_id);
1157        if (which == BAD_SNAP_INDEX)
1158                return ERR_PTR(-ENOENT);
1159
1160        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1161        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1162}
1163
1164static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1165{
1166        if (snap_id == CEPH_NOSNAP)
1167                return RBD_SNAP_HEAD_NAME;
1168
1169        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1170        if (rbd_dev->image_format == 1)
1171                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1172
1173        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1174}
1175
1176static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1177                                u64 *snap_size)
1178{
1179        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1180        if (snap_id == CEPH_NOSNAP) {
1181                *snap_size = rbd_dev->header.image_size;
1182        } else if (rbd_dev->image_format == 1) {
1183                u32 which;
1184
1185                which = rbd_dev_snap_index(rbd_dev, snap_id);
1186                if (which == BAD_SNAP_INDEX)
1187                        return -ENOENT;
1188
1189                *snap_size = rbd_dev->header.snap_sizes[which];
1190        } else {
1191                u64 size = 0;
1192                int ret;
1193
1194                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1195                if (ret)
1196                        return ret;
1197
1198                *snap_size = size;
1199        }
1200        return 0;
1201}
1202
1203static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1204{
1205        u64 snap_id = rbd_dev->spec->snap_id;
1206        u64 size = 0;
1207        int ret;
1208
1209        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1210        if (ret)
1211                return ret;
1212
1213        rbd_dev->mapping.size = size;
1214        return 0;
1215}
1216
1217static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1218{
1219        rbd_dev->mapping.size = 0;
1220}
1221
1222static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1223{
1224        struct ceph_bio_iter it = *bio_pos;
1225
1226        ceph_bio_iter_advance(&it, off);
1227        ceph_bio_iter_advance_step(&it, bytes, ({
1228                memzero_bvec(&bv);
1229        }));
1230}
1231
1232static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1233{
1234        struct ceph_bvec_iter it = *bvec_pos;
1235
1236        ceph_bvec_iter_advance(&it, off);
1237        ceph_bvec_iter_advance_step(&it, bytes, ({
1238                memzero_bvec(&bv);
1239        }));
1240}
1241
1242/*
1243 * Zero a range in @obj_req data buffer defined by a bio (list) or
1244 * (private) bio_vec array.
1245 *
1246 * @off is relative to the start of the data buffer.
1247 */
1248static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1249                               u32 bytes)
1250{
1251        dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1252
1253        switch (obj_req->img_request->data_type) {
1254        case OBJ_REQUEST_BIO:
1255                zero_bios(&obj_req->bio_pos, off, bytes);
1256                break;
1257        case OBJ_REQUEST_BVECS:
1258        case OBJ_REQUEST_OWN_BVECS:
1259                zero_bvecs(&obj_req->bvec_pos, off, bytes);
1260                break;
1261        default:
1262                BUG();
1263        }
1264}
1265
1266static void rbd_obj_request_destroy(struct kref *kref);
1267static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1268{
1269        rbd_assert(obj_request != NULL);
1270        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1271                kref_read(&obj_request->kref));
1272        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1273}
1274
1275static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1276                                        struct rbd_obj_request *obj_request)
1277{
1278        rbd_assert(obj_request->img_request == NULL);
1279
1280        /* Image request now owns object's original reference */
1281        obj_request->img_request = img_request;
1282        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1283}
1284
1285static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1286                                        struct rbd_obj_request *obj_request)
1287{
1288        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1289        list_del(&obj_request->ex.oe_item);
1290        rbd_assert(obj_request->img_request == img_request);
1291        rbd_obj_request_put(obj_request);
1292}
1293
1294static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1295{
1296        struct rbd_obj_request *obj_req = osd_req->r_priv;
1297
1298        dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1299             __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1300             obj_req->ex.oe_off, obj_req->ex.oe_len);
1301        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1302}
1303
1304/*
1305 * The default/initial value for all image request flags is 0.  Each
1306 * is conditionally set to 1 at image request initialization time
1307 * and currently never change thereafter.
1308 */
1309static void img_request_layered_set(struct rbd_img_request *img_request)
1310{
1311        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1312}
1313
1314static bool img_request_layered_test(struct rbd_img_request *img_request)
1315{
1316        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1317}
1318
1319static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1320{
1321        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1322
1323        return !obj_req->ex.oe_off &&
1324               obj_req->ex.oe_len == rbd_dev->layout.object_size;
1325}
1326
1327static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1328{
1329        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1330
1331        return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1332                                        rbd_dev->layout.object_size;
1333}
1334
1335/*
1336 * Must be called after rbd_obj_calc_img_extents().
1337 */
1338static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1339{
1340        if (!obj_req->num_img_extents ||
1341            (rbd_obj_is_entire(obj_req) &&
1342             !obj_req->img_request->snapc->num_snaps))
1343                return false;
1344
1345        return true;
1346}
1347
1348static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1349{
1350        return ceph_file_extents_bytes(obj_req->img_extents,
1351                                       obj_req->num_img_extents);
1352}
1353
1354static bool rbd_img_is_write(struct rbd_img_request *img_req)
1355{
1356        switch (img_req->op_type) {
1357        case OBJ_OP_READ:
1358                return false;
1359        case OBJ_OP_WRITE:
1360        case OBJ_OP_DISCARD:
1361        case OBJ_OP_ZEROOUT:
1362                return true;
1363        default:
1364                BUG();
1365        }
1366}
1367
1368static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1369{
1370        struct rbd_obj_request *obj_req = osd_req->r_priv;
1371        int result;
1372
1373        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1374             osd_req->r_result, obj_req);
1375
1376        /*
1377         * Writes aren't allowed to return a data payload.  In some
1378         * guarded write cases (e.g. stat + zero on an empty object)
1379         * a stat response makes it through, but we don't care.
1380         */
1381        if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1382                result = 0;
1383        else
1384                result = osd_req->r_result;
1385
1386        rbd_obj_handle_request(obj_req, result);
1387}
1388
1389static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1390{
1391        struct rbd_obj_request *obj_request = osd_req->r_priv;
1392        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1393        struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1394
1395        osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1396        osd_req->r_snapid = obj_request->img_request->snap_id;
1397}
1398
1399static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1400{
1401        struct rbd_obj_request *obj_request = osd_req->r_priv;
1402
1403        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1404        ktime_get_real_ts64(&osd_req->r_mtime);
1405        osd_req->r_data_offset = obj_request->ex.oe_off;
1406}
1407
1408static struct ceph_osd_request *
1409__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1410                          struct ceph_snap_context *snapc, int num_ops)
1411{
1412        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1413        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1414        struct ceph_osd_request *req;
1415        const char *name_format = rbd_dev->image_format == 1 ?
1416                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1417        int ret;
1418
1419        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1420        if (!req)
1421                return ERR_PTR(-ENOMEM);
1422
1423        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1424        req->r_callback = rbd_osd_req_callback;
1425        req->r_priv = obj_req;
1426
1427        /*
1428         * Data objects may be stored in a separate pool, but always in
1429         * the same namespace in that pool as the header in its pool.
1430         */
1431        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1432        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1433
1434        ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1435                               rbd_dev->header.object_prefix,
1436                               obj_req->ex.oe_objno);
1437        if (ret)
1438                return ERR_PTR(ret);
1439
1440        return req;
1441}
1442
1443static struct ceph_osd_request *
1444rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1445{
1446        return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1447                                         num_ops);
1448}
1449
1450static struct rbd_obj_request *rbd_obj_request_create(void)
1451{
1452        struct rbd_obj_request *obj_request;
1453
1454        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1455        if (!obj_request)
1456                return NULL;
1457
1458        ceph_object_extent_init(&obj_request->ex);
1459        INIT_LIST_HEAD(&obj_request->osd_reqs);
1460        mutex_init(&obj_request->state_mutex);
1461        kref_init(&obj_request->kref);
1462
1463        dout("%s %p\n", __func__, obj_request);
1464        return obj_request;
1465}
1466
1467static void rbd_obj_request_destroy(struct kref *kref)
1468{
1469        struct rbd_obj_request *obj_request;
1470        struct ceph_osd_request *osd_req;
1471        u32 i;
1472
1473        obj_request = container_of(kref, struct rbd_obj_request, kref);
1474
1475        dout("%s: obj %p\n", __func__, obj_request);
1476
1477        while (!list_empty(&obj_request->osd_reqs)) {
1478                osd_req = list_first_entry(&obj_request->osd_reqs,
1479                                    struct ceph_osd_request, r_private_item);
1480                list_del_init(&osd_req->r_private_item);
1481                ceph_osdc_put_request(osd_req);
1482        }
1483
1484        switch (obj_request->img_request->data_type) {
1485        case OBJ_REQUEST_NODATA:
1486        case OBJ_REQUEST_BIO:
1487        case OBJ_REQUEST_BVECS:
1488                break;          /* Nothing to do */
1489        case OBJ_REQUEST_OWN_BVECS:
1490                kfree(obj_request->bvec_pos.bvecs);
1491                break;
1492        default:
1493                BUG();
1494        }
1495
1496        kfree(obj_request->img_extents);
1497        if (obj_request->copyup_bvecs) {
1498                for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1499                        if (obj_request->copyup_bvecs[i].bv_page)
1500                                __free_page(obj_request->copyup_bvecs[i].bv_page);
1501                }
1502                kfree(obj_request->copyup_bvecs);
1503        }
1504
1505        kmem_cache_free(rbd_obj_request_cache, obj_request);
1506}
1507
1508/* It's OK to call this for a device with no parent */
1509
1510static void rbd_spec_put(struct rbd_spec *spec);
1511static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1512{
1513        rbd_dev_remove_parent(rbd_dev);
1514        rbd_spec_put(rbd_dev->parent_spec);
1515        rbd_dev->parent_spec = NULL;
1516        rbd_dev->parent_overlap = 0;
1517}
1518
1519/*
1520 * Parent image reference counting is used to determine when an
1521 * image's parent fields can be safely torn down--after there are no
1522 * more in-flight requests to the parent image.  When the last
1523 * reference is dropped, cleaning them up is safe.
1524 */
1525static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1526{
1527        int counter;
1528
1529        if (!rbd_dev->parent_spec)
1530                return;
1531
1532        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1533        if (counter > 0)
1534                return;
1535
1536        /* Last reference; clean up parent data structures */
1537
1538        if (!counter)
1539                rbd_dev_unparent(rbd_dev);
1540        else
1541                rbd_warn(rbd_dev, "parent reference underflow");
1542}
1543
1544/*
1545 * If an image has a non-zero parent overlap, get a reference to its
1546 * parent.
1547 *
1548 * Returns true if the rbd device has a parent with a non-zero
1549 * overlap and a reference for it was successfully taken, or
1550 * false otherwise.
1551 */
1552static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1553{
1554        int counter = 0;
1555
1556        if (!rbd_dev->parent_spec)
1557                return false;
1558
1559        if (rbd_dev->parent_overlap)
1560                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1561
1562        if (counter < 0)
1563                rbd_warn(rbd_dev, "parent reference overflow");
1564
1565        return counter > 0;
1566}
1567
1568static void rbd_img_request_init(struct rbd_img_request *img_request,
1569                                 struct rbd_device *rbd_dev,
1570                                 enum obj_operation_type op_type)
1571{
1572        memset(img_request, 0, sizeof(*img_request));
1573
1574        img_request->rbd_dev = rbd_dev;
1575        img_request->op_type = op_type;
1576
1577        INIT_LIST_HEAD(&img_request->lock_item);
1578        INIT_LIST_HEAD(&img_request->object_extents);
1579        mutex_init(&img_request->state_mutex);
1580}
1581
1582static void rbd_img_capture_header(struct rbd_img_request *img_req)
1583{
1584        struct rbd_device *rbd_dev = img_req->rbd_dev;
1585
1586        lockdep_assert_held(&rbd_dev->header_rwsem);
1587
1588        if (rbd_img_is_write(img_req))
1589                img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1590        else
1591                img_req->snap_id = rbd_dev->spec->snap_id;
1592
1593        if (rbd_dev_parent_get(rbd_dev))
1594                img_request_layered_set(img_req);
1595}
1596
1597static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1598{
1599        struct rbd_obj_request *obj_request;
1600        struct rbd_obj_request *next_obj_request;
1601
1602        dout("%s: img %p\n", __func__, img_request);
1603
1604        WARN_ON(!list_empty(&img_request->lock_item));
1605        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1606                rbd_img_obj_request_del(img_request, obj_request);
1607
1608        if (img_request_layered_test(img_request))
1609                rbd_dev_parent_put(img_request->rbd_dev);
1610
1611        if (rbd_img_is_write(img_request))
1612                ceph_put_snap_context(img_request->snapc);
1613
1614        if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1615                kmem_cache_free(rbd_img_request_cache, img_request);
1616}
1617
1618#define BITS_PER_OBJ    2
1619#define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1620#define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1621
1622static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1623                                   u64 *index, u8 *shift)
1624{
1625        u32 off;
1626
1627        rbd_assert(objno < rbd_dev->object_map_size);
1628        *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1629        *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1630}
1631
1632static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1633{
1634        u64 index;
1635        u8 shift;
1636
1637        lockdep_assert_held(&rbd_dev->object_map_lock);
1638        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1639        return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1640}
1641
1642static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1643{
1644        u64 index;
1645        u8 shift;
1646        u8 *p;
1647
1648        lockdep_assert_held(&rbd_dev->object_map_lock);
1649        rbd_assert(!(val & ~OBJ_MASK));
1650
1651        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1652        p = &rbd_dev->object_map[index];
1653        *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1654}
1655
1656static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1657{
1658        u8 state;
1659
1660        spin_lock(&rbd_dev->object_map_lock);
1661        state = __rbd_object_map_get(rbd_dev, objno);
1662        spin_unlock(&rbd_dev->object_map_lock);
1663        return state;
1664}
1665
1666static bool use_object_map(struct rbd_device *rbd_dev)
1667{
1668        /*
1669         * An image mapped read-only can't use the object map -- it isn't
1670         * loaded because the header lock isn't acquired.  Someone else can
1671         * write to the image and update the object map behind our back.
1672         *
1673         * A snapshot can't be written to, so using the object map is always
1674         * safe.
1675         */
1676        if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1677                return false;
1678
1679        return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1680                !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1681}
1682
1683static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1684{
1685        u8 state;
1686
1687        /* fall back to default logic if object map is disabled or invalid */
1688        if (!use_object_map(rbd_dev))
1689                return true;
1690
1691        state = rbd_object_map_get(rbd_dev, objno);
1692        return state != OBJECT_NONEXISTENT;
1693}
1694
1695static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1696                                struct ceph_object_id *oid)
1697{
1698        if (snap_id == CEPH_NOSNAP)
1699                ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1700                                rbd_dev->spec->image_id);
1701        else
1702                ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1703                                rbd_dev->spec->image_id, snap_id);
1704}
1705
1706static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1707{
1708        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1709        CEPH_DEFINE_OID_ONSTACK(oid);
1710        u8 lock_type;
1711        char *lock_tag;
1712        struct ceph_locker *lockers;
1713        u32 num_lockers;
1714        bool broke_lock = false;
1715        int ret;
1716
1717        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1718
1719again:
1720        ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1721                            CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1722        if (ret != -EBUSY || broke_lock) {
1723                if (ret == -EEXIST)
1724                        ret = 0; /* already locked by myself */
1725                if (ret)
1726                        rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1727                return ret;
1728        }
1729
1730        ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1731                                 RBD_LOCK_NAME, &lock_type, &lock_tag,
1732                                 &lockers, &num_lockers);
1733        if (ret) {
1734                if (ret == -ENOENT)
1735                        goto again;
1736
1737                rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1738                return ret;
1739        }
1740
1741        kfree(lock_tag);
1742        if (num_lockers == 0)
1743                goto again;
1744
1745        rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1746                 ENTITY_NAME(lockers[0].id.name));
1747
1748        ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1749                                  RBD_LOCK_NAME, lockers[0].id.cookie,
1750                                  &lockers[0].id.name);
1751        ceph_free_lockers(lockers, num_lockers);
1752        if (ret) {
1753                if (ret == -ENOENT)
1754                        goto again;
1755
1756                rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1757                return ret;
1758        }
1759
1760        broke_lock = true;
1761        goto again;
1762}
1763
1764static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1765{
1766        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1767        CEPH_DEFINE_OID_ONSTACK(oid);
1768        int ret;
1769
1770        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1771
1772        ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1773                              "");
1774        if (ret && ret != -ENOENT)
1775                rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1776}
1777
1778static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1779{
1780        u8 struct_v;
1781        u32 struct_len;
1782        u32 header_len;
1783        void *header_end;
1784        int ret;
1785
1786        ceph_decode_32_safe(p, end, header_len, e_inval);
1787        header_end = *p + header_len;
1788
1789        ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1790                                  &struct_len);
1791        if (ret)
1792                return ret;
1793
1794        ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1795
1796        *p = header_end;
1797        return 0;
1798
1799e_inval:
1800        return -EINVAL;
1801}
1802
1803static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1804{
1805        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1806        CEPH_DEFINE_OID_ONSTACK(oid);
1807        struct page **pages;
1808        void *p, *end;
1809        size_t reply_len;
1810        u64 num_objects;
1811        u64 object_map_bytes;
1812        u64 object_map_size;
1813        int num_pages;
1814        int ret;
1815
1816        rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1817
1818        num_objects = ceph_get_num_objects(&rbd_dev->layout,
1819                                           rbd_dev->mapping.size);
1820        object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1821                                            BITS_PER_BYTE);
1822        num_pages = calc_pages_for(0, object_map_bytes) + 1;
1823        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1824        if (IS_ERR(pages))
1825                return PTR_ERR(pages);
1826
1827        reply_len = num_pages * PAGE_SIZE;
1828        rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1829        ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1830                             "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1831                             NULL, 0, pages, &reply_len);
1832        if (ret)
1833                goto out;
1834
1835        p = page_address(pages[0]);
1836        end = p + min(reply_len, (size_t)PAGE_SIZE);
1837        ret = decode_object_map_header(&p, end, &object_map_size);
1838        if (ret)
1839                goto out;
1840
1841        if (object_map_size != num_objects) {
1842                rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1843                         object_map_size, num_objects);
1844                ret = -EINVAL;
1845                goto out;
1846        }
1847
1848        if (offset_in_page(p) + object_map_bytes > reply_len) {
1849                ret = -EINVAL;
1850                goto out;
1851        }
1852
1853        rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1854        if (!rbd_dev->object_map) {
1855                ret = -ENOMEM;
1856                goto out;
1857        }
1858
1859        rbd_dev->object_map_size = object_map_size;
1860        ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1861                                   offset_in_page(p), object_map_bytes);
1862
1863out:
1864        ceph_release_page_vector(pages, num_pages);
1865        return ret;
1866}
1867
1868static void rbd_object_map_free(struct rbd_device *rbd_dev)
1869{
1870        kvfree(rbd_dev->object_map);
1871        rbd_dev->object_map = NULL;
1872        rbd_dev->object_map_size = 0;
1873}
1874
1875static int rbd_object_map_load(struct rbd_device *rbd_dev)
1876{
1877        int ret;
1878
1879        ret = __rbd_object_map_load(rbd_dev);
1880        if (ret)
1881                return ret;
1882
1883        ret = rbd_dev_v2_get_flags(rbd_dev);
1884        if (ret) {
1885                rbd_object_map_free(rbd_dev);
1886                return ret;
1887        }
1888
1889        if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1890                rbd_warn(rbd_dev, "object map is invalid");
1891
1892        return 0;
1893}
1894
1895static int rbd_object_map_open(struct rbd_device *rbd_dev)
1896{
1897        int ret;
1898
1899        ret = rbd_object_map_lock(rbd_dev);
1900        if (ret)
1901                return ret;
1902
1903        ret = rbd_object_map_load(rbd_dev);
1904        if (ret) {
1905                rbd_object_map_unlock(rbd_dev);
1906                return ret;
1907        }
1908
1909        return 0;
1910}
1911
1912static void rbd_object_map_close(struct rbd_device *rbd_dev)
1913{
1914        rbd_object_map_free(rbd_dev);
1915        rbd_object_map_unlock(rbd_dev);
1916}
1917
1918/*
1919 * This function needs snap_id (or more precisely just something to
1920 * distinguish between HEAD and snapshot object maps), new_state and
1921 * current_state that were passed to rbd_object_map_update().
1922 *
1923 * To avoid allocating and stashing a context we piggyback on the OSD
1924 * request.  A HEAD update has two ops (assert_locked).  For new_state
1925 * and current_state we decode our own object_map_update op, encoded in
1926 * rbd_cls_object_map_update().
1927 */
1928static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1929                                        struct ceph_osd_request *osd_req)
1930{
1931        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1932        struct ceph_osd_data *osd_data;
1933        u64 objno;
1934        u8 state, new_state, current_state;
1935        bool has_current_state;
1936        void *p;
1937
1938        if (osd_req->r_result)
1939                return osd_req->r_result;
1940
1941        /*
1942         * Nothing to do for a snapshot object map.
1943         */
1944        if (osd_req->r_num_ops == 1)
1945                return 0;
1946
1947        /*
1948         * Update in-memory HEAD object map.
1949         */
1950        rbd_assert(osd_req->r_num_ops == 2);
1951        osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1952        rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1953
1954        p = page_address(osd_data->pages[0]);
1955        objno = ceph_decode_64(&p);
1956        rbd_assert(objno == obj_req->ex.oe_objno);
1957        rbd_assert(ceph_decode_64(&p) == objno + 1);
1958        new_state = ceph_decode_8(&p);
1959        has_current_state = ceph_decode_8(&p);
1960        if (has_current_state)
1961                current_state = ceph_decode_8(&p);
1962
1963        spin_lock(&rbd_dev->object_map_lock);
1964        state = __rbd_object_map_get(rbd_dev, objno);
1965        if (!has_current_state || current_state == state ||
1966            (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1967                __rbd_object_map_set(rbd_dev, objno, new_state);
1968        spin_unlock(&rbd_dev->object_map_lock);
1969
1970        return 0;
1971}
1972
1973static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1974{
1975        struct rbd_obj_request *obj_req = osd_req->r_priv;
1976        int result;
1977
1978        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1979             osd_req->r_result, obj_req);
1980
1981        result = rbd_object_map_update_finish(obj_req, osd_req);
1982        rbd_obj_handle_request(obj_req, result);
1983}
1984
1985static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
1986{
1987        u8 state = rbd_object_map_get(rbd_dev, objno);
1988
1989        if (state == new_state ||
1990            (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
1991            (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
1992                return false;
1993
1994        return true;
1995}
1996
1997static int rbd_cls_object_map_update(struct ceph_osd_request *req,
1998                                     int which, u64 objno, u8 new_state,
1999                                     const u8 *current_state)
2000{
2001        struct page **pages;
2002        void *p, *start;
2003        int ret;
2004
2005        ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2006        if (ret)
2007                return ret;
2008
2009        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2010        if (IS_ERR(pages))
2011                return PTR_ERR(pages);
2012
2013        p = start = page_address(pages[0]);
2014        ceph_encode_64(&p, objno);
2015        ceph_encode_64(&p, objno + 1);
2016        ceph_encode_8(&p, new_state);
2017        if (current_state) {
2018                ceph_encode_8(&p, 1);
2019                ceph_encode_8(&p, *current_state);
2020        } else {
2021                ceph_encode_8(&p, 0);
2022        }
2023
2024        osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2025                                          false, true);
2026        return 0;
2027}
2028
2029/*
2030 * Return:
2031 *   0 - object map update sent
2032 *   1 - object map update isn't needed
2033 *  <0 - error
2034 */
2035static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2036                                 u8 new_state, const u8 *current_state)
2037{
2038        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2039        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2040        struct ceph_osd_request *req;
2041        int num_ops = 1;
2042        int which = 0;
2043        int ret;
2044
2045        if (snap_id == CEPH_NOSNAP) {
2046                if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2047                        return 1;
2048
2049                num_ops++; /* assert_locked */
2050        }
2051
2052        req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2053        if (!req)
2054                return -ENOMEM;
2055
2056        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2057        req->r_callback = rbd_object_map_callback;
2058        req->r_priv = obj_req;
2059
2060        rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2061        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2062        req->r_flags = CEPH_OSD_FLAG_WRITE;
2063        ktime_get_real_ts64(&req->r_mtime);
2064
2065        if (snap_id == CEPH_NOSNAP) {
2066                /*
2067                 * Protect against possible race conditions during lock
2068                 * ownership transitions.
2069                 */
2070                ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2071                                             CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2072                if (ret)
2073                        return ret;
2074        }
2075
2076        ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2077                                        new_state, current_state);
2078        if (ret)
2079                return ret;
2080
2081        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2082        if (ret)
2083                return ret;
2084
2085        ceph_osdc_start_request(osdc, req, false);
2086        return 0;
2087}
2088
2089static void prune_extents(struct ceph_file_extent *img_extents,
2090                          u32 *num_img_extents, u64 overlap)
2091{
2092        u32 cnt = *num_img_extents;
2093
2094        /* drop extents completely beyond the overlap */
2095        while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2096                cnt--;
2097
2098        if (cnt) {
2099                struct ceph_file_extent *ex = &img_extents[cnt - 1];
2100
2101                /* trim final overlapping extent */
2102                if (ex->fe_off + ex->fe_len > overlap)
2103                        ex->fe_len = overlap - ex->fe_off;
2104        }
2105
2106        *num_img_extents = cnt;
2107}
2108
2109/*
2110 * Determine the byte range(s) covered by either just the object extent
2111 * or the entire object in the parent image.
2112 */
2113static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2114                                    bool entire)
2115{
2116        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2117        int ret;
2118
2119        if (!rbd_dev->parent_overlap)
2120                return 0;
2121
2122        ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2123                                  entire ? 0 : obj_req->ex.oe_off,
2124                                  entire ? rbd_dev->layout.object_size :
2125                                                        obj_req->ex.oe_len,
2126                                  &obj_req->img_extents,
2127                                  &obj_req->num_img_extents);
2128        if (ret)
2129                return ret;
2130
2131        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2132                      rbd_dev->parent_overlap);
2133        return 0;
2134}
2135
2136static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2137{
2138        struct rbd_obj_request *obj_req = osd_req->r_priv;
2139
2140        switch (obj_req->img_request->data_type) {
2141        case OBJ_REQUEST_BIO:
2142                osd_req_op_extent_osd_data_bio(osd_req, which,
2143                                               &obj_req->bio_pos,
2144                                               obj_req->ex.oe_len);
2145                break;
2146        case OBJ_REQUEST_BVECS:
2147        case OBJ_REQUEST_OWN_BVECS:
2148                rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2149                                                        obj_req->ex.oe_len);
2150                rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2151                osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2152                                                    &obj_req->bvec_pos);
2153                break;
2154        default:
2155                BUG();
2156        }
2157}
2158
2159static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2160{
2161        struct page **pages;
2162
2163        /*
2164         * The response data for a STAT call consists of:
2165         *     le64 length;
2166         *     struct {
2167         *         le32 tv_sec;
2168         *         le32 tv_nsec;
2169         *     } mtime;
2170         */
2171        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2172        if (IS_ERR(pages))
2173                return PTR_ERR(pages);
2174
2175        osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2176        osd_req_op_raw_data_in_pages(osd_req, which, pages,
2177                                     8 + sizeof(struct ceph_timespec),
2178                                     0, false, true);
2179        return 0;
2180}
2181
2182static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2183                                u32 bytes)
2184{
2185        struct rbd_obj_request *obj_req = osd_req->r_priv;
2186        int ret;
2187
2188        ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2189        if (ret)
2190                return ret;
2191
2192        osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2193                                          obj_req->copyup_bvec_count, bytes);
2194        return 0;
2195}
2196
2197static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2198{
2199        obj_req->read_state = RBD_OBJ_READ_START;
2200        return 0;
2201}
2202
2203static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2204                                      int which)
2205{
2206        struct rbd_obj_request *obj_req = osd_req->r_priv;
2207        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2208        u16 opcode;
2209
2210        if (!use_object_map(rbd_dev) ||
2211            !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2212                osd_req_op_alloc_hint_init(osd_req, which++,
2213                                           rbd_dev->layout.object_size,
2214                                           rbd_dev->layout.object_size,
2215                                           rbd_dev->opts->alloc_hint_flags);
2216        }
2217
2218        if (rbd_obj_is_entire(obj_req))
2219                opcode = CEPH_OSD_OP_WRITEFULL;
2220        else
2221                opcode = CEPH_OSD_OP_WRITE;
2222
2223        osd_req_op_extent_init(osd_req, which, opcode,
2224                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2225        rbd_osd_setup_data(osd_req, which);
2226}
2227
2228static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2229{
2230        int ret;
2231
2232        /* reverse map the entire object onto the parent */
2233        ret = rbd_obj_calc_img_extents(obj_req, true);
2234        if (ret)
2235                return ret;
2236
2237        if (rbd_obj_copyup_enabled(obj_req))
2238                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2239
2240        obj_req->write_state = RBD_OBJ_WRITE_START;
2241        return 0;
2242}
2243
2244static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2245{
2246        return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2247                                          CEPH_OSD_OP_ZERO;
2248}
2249
2250static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2251                                        int which)
2252{
2253        struct rbd_obj_request *obj_req = osd_req->r_priv;
2254
2255        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2256                rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2257                osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2258        } else {
2259                osd_req_op_extent_init(osd_req, which,
2260                                       truncate_or_zero_opcode(obj_req),
2261                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2262                                       0, 0);
2263        }
2264}
2265
2266static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2267{
2268        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2269        u64 off, next_off;
2270        int ret;
2271
2272        /*
2273         * Align the range to alloc_size boundary and punt on discards
2274         * that are too small to free up any space.
2275         *
2276         * alloc_size == object_size && is_tail() is a special case for
2277         * filestore with filestore_punch_hole = false, needed to allow
2278         * truncate (in addition to delete).
2279         */
2280        if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2281            !rbd_obj_is_tail(obj_req)) {
2282                off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2283                next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2284                                      rbd_dev->opts->alloc_size);
2285                if (off >= next_off)
2286                        return 1;
2287
2288                dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2289                     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2290                     off, next_off - off);
2291                obj_req->ex.oe_off = off;
2292                obj_req->ex.oe_len = next_off - off;
2293        }
2294
2295        /* reverse map the entire object onto the parent */
2296        ret = rbd_obj_calc_img_extents(obj_req, true);
2297        if (ret)
2298                return ret;
2299
2300        obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2301        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2302                obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2303
2304        obj_req->write_state = RBD_OBJ_WRITE_START;
2305        return 0;
2306}
2307
2308static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2309                                        int which)
2310{
2311        struct rbd_obj_request *obj_req = osd_req->r_priv;
2312        u16 opcode;
2313
2314        if (rbd_obj_is_entire(obj_req)) {
2315                if (obj_req->num_img_extents) {
2316                        if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2317                                osd_req_op_init(osd_req, which++,
2318                                                CEPH_OSD_OP_CREATE, 0);
2319                        opcode = CEPH_OSD_OP_TRUNCATE;
2320                } else {
2321                        rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2322                        osd_req_op_init(osd_req, which++,
2323                                        CEPH_OSD_OP_DELETE, 0);
2324                        opcode = 0;
2325                }
2326        } else {
2327                opcode = truncate_or_zero_opcode(obj_req);
2328        }
2329
2330        if (opcode)
2331                osd_req_op_extent_init(osd_req, which, opcode,
2332                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2333                                       0, 0);
2334}
2335
2336static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2337{
2338        int ret;
2339
2340        /* reverse map the entire object onto the parent */
2341        ret = rbd_obj_calc_img_extents(obj_req, true);
2342        if (ret)
2343                return ret;
2344
2345        if (rbd_obj_copyup_enabled(obj_req))
2346                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2347        if (!obj_req->num_img_extents) {
2348                obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2349                if (rbd_obj_is_entire(obj_req))
2350                        obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2351        }
2352
2353        obj_req->write_state = RBD_OBJ_WRITE_START;
2354        return 0;
2355}
2356
2357static int count_write_ops(struct rbd_obj_request *obj_req)
2358{
2359        struct rbd_img_request *img_req = obj_req->img_request;
2360
2361        switch (img_req->op_type) {
2362        case OBJ_OP_WRITE:
2363                if (!use_object_map(img_req->rbd_dev) ||
2364                    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2365                        return 2; /* setallochint + write/writefull */
2366
2367                return 1; /* write/writefull */
2368        case OBJ_OP_DISCARD:
2369                return 1; /* delete/truncate/zero */
2370        case OBJ_OP_ZEROOUT:
2371                if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2372                    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2373                        return 2; /* create + truncate */
2374
2375                return 1; /* delete/truncate/zero */
2376        default:
2377                BUG();
2378        }
2379}
2380
2381static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2382                                    int which)
2383{
2384        struct rbd_obj_request *obj_req = osd_req->r_priv;
2385
2386        switch (obj_req->img_request->op_type) {
2387        case OBJ_OP_WRITE:
2388                __rbd_osd_setup_write_ops(osd_req, which);
2389                break;
2390        case OBJ_OP_DISCARD:
2391                __rbd_osd_setup_discard_ops(osd_req, which);
2392                break;
2393        case OBJ_OP_ZEROOUT:
2394                __rbd_osd_setup_zeroout_ops(osd_req, which);
2395                break;
2396        default:
2397                BUG();
2398        }
2399}
2400
2401/*
2402 * Prune the list of object requests (adjust offset and/or length, drop
2403 * redundant requests).  Prepare object request state machines and image
2404 * request state machine for execution.
2405 */
2406static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2407{
2408        struct rbd_obj_request *obj_req, *next_obj_req;
2409        int ret;
2410
2411        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2412                switch (img_req->op_type) {
2413                case OBJ_OP_READ:
2414                        ret = rbd_obj_init_read(obj_req);
2415                        break;
2416                case OBJ_OP_WRITE:
2417                        ret = rbd_obj_init_write(obj_req);
2418                        break;
2419                case OBJ_OP_DISCARD:
2420                        ret = rbd_obj_init_discard(obj_req);
2421                        break;
2422                case OBJ_OP_ZEROOUT:
2423                        ret = rbd_obj_init_zeroout(obj_req);
2424                        break;
2425                default:
2426                        BUG();
2427                }
2428                if (ret < 0)
2429                        return ret;
2430                if (ret > 0) {
2431                        rbd_img_obj_request_del(img_req, obj_req);
2432                        continue;
2433                }
2434        }
2435
2436        img_req->state = RBD_IMG_START;
2437        return 0;
2438}
2439
2440union rbd_img_fill_iter {
2441        struct ceph_bio_iter    bio_iter;
2442        struct ceph_bvec_iter   bvec_iter;
2443};
2444
2445struct rbd_img_fill_ctx {
2446        enum obj_request_type   pos_type;
2447        union rbd_img_fill_iter *pos;
2448        union rbd_img_fill_iter iter;
2449        ceph_object_extent_fn_t set_pos_fn;
2450        ceph_object_extent_fn_t count_fn;
2451        ceph_object_extent_fn_t copy_fn;
2452};
2453
2454static struct ceph_object_extent *alloc_object_extent(void *arg)
2455{
2456        struct rbd_img_request *img_req = arg;
2457        struct rbd_obj_request *obj_req;
2458
2459        obj_req = rbd_obj_request_create();
2460        if (!obj_req)
2461                return NULL;
2462
2463        rbd_img_obj_request_add(img_req, obj_req);
2464        return &obj_req->ex;
2465}
2466
2467/*
2468 * While su != os && sc == 1 is technically not fancy (it's the same
2469 * layout as su == os && sc == 1), we can't use the nocopy path for it
2470 * because ->set_pos_fn() should be called only once per object.
2471 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2472 * treat su != os && sc == 1 as fancy.
2473 */
2474static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2475{
2476        return l->stripe_unit != l->object_size;
2477}
2478
2479static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2480                                       struct ceph_file_extent *img_extents,
2481                                       u32 num_img_extents,
2482                                       struct rbd_img_fill_ctx *fctx)
2483{
2484        u32 i;
2485        int ret;
2486
2487        img_req->data_type = fctx->pos_type;
2488
2489        /*
2490         * Create object requests and set each object request's starting
2491         * position in the provided bio (list) or bio_vec array.
2492         */
2493        fctx->iter = *fctx->pos;
2494        for (i = 0; i < num_img_extents; i++) {
2495                ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2496                                           img_extents[i].fe_off,
2497                                           img_extents[i].fe_len,
2498                                           &img_req->object_extents,
2499                                           alloc_object_extent, img_req,
2500                                           fctx->set_pos_fn, &fctx->iter);
2501                if (ret)
2502                        return ret;
2503        }
2504
2505        return __rbd_img_fill_request(img_req);
2506}
2507
2508/*
2509 * Map a list of image extents to a list of object extents, create the
2510 * corresponding object requests (normally each to a different object,
2511 * but not always) and add them to @img_req.  For each object request,
2512 * set up its data descriptor to point to the corresponding chunk(s) of
2513 * @fctx->pos data buffer.
2514 *
2515 * Because ceph_file_to_extents() will merge adjacent object extents
2516 * together, each object request's data descriptor may point to multiple
2517 * different chunks of @fctx->pos data buffer.
2518 *
2519 * @fctx->pos data buffer is assumed to be large enough.
2520 */
2521static int rbd_img_fill_request(struct rbd_img_request *img_req,
2522                                struct ceph_file_extent *img_extents,
2523                                u32 num_img_extents,
2524                                struct rbd_img_fill_ctx *fctx)
2525{
2526        struct rbd_device *rbd_dev = img_req->rbd_dev;
2527        struct rbd_obj_request *obj_req;
2528        u32 i;
2529        int ret;
2530
2531        if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2532            !rbd_layout_is_fancy(&rbd_dev->layout))
2533                return rbd_img_fill_request_nocopy(img_req, img_extents,
2534                                                   num_img_extents, fctx);
2535
2536        img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2537
2538        /*
2539         * Create object requests and determine ->bvec_count for each object
2540         * request.  Note that ->bvec_count sum over all object requests may
2541         * be greater than the number of bio_vecs in the provided bio (list)
2542         * or bio_vec array because when mapped, those bio_vecs can straddle
2543         * stripe unit boundaries.
2544         */
2545        fctx->iter = *fctx->pos;
2546        for (i = 0; i < num_img_extents; i++) {
2547                ret = ceph_file_to_extents(&rbd_dev->layout,
2548                                           img_extents[i].fe_off,
2549                                           img_extents[i].fe_len,
2550                                           &img_req->object_extents,
2551                                           alloc_object_extent, img_req,
2552                                           fctx->count_fn, &fctx->iter);
2553                if (ret)
2554                        return ret;
2555        }
2556
2557        for_each_obj_request(img_req, obj_req) {
2558                obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2559                                              sizeof(*obj_req->bvec_pos.bvecs),
2560                                              GFP_NOIO);
2561                if (!obj_req->bvec_pos.bvecs)
2562                        return -ENOMEM;
2563        }
2564
2565        /*
2566         * Fill in each object request's private bio_vec array, splitting and
2567         * rearranging the provided bio_vecs in stripe unit chunks as needed.
2568         */
2569        fctx->iter = *fctx->pos;
2570        for (i = 0; i < num_img_extents; i++) {
2571                ret = ceph_iterate_extents(&rbd_dev->layout,
2572                                           img_extents[i].fe_off,
2573                                           img_extents[i].fe_len,
2574                                           &img_req->object_extents,
2575                                           fctx->copy_fn, &fctx->iter);
2576                if (ret)
2577                        return ret;
2578        }
2579
2580        return __rbd_img_fill_request(img_req);
2581}
2582
2583static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2584                               u64 off, u64 len)
2585{
2586        struct ceph_file_extent ex = { off, len };
2587        union rbd_img_fill_iter dummy = {};
2588        struct rbd_img_fill_ctx fctx = {
2589                .pos_type = OBJ_REQUEST_NODATA,
2590                .pos = &dummy,
2591        };
2592
2593        return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2594}
2595
2596static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2597{
2598        struct rbd_obj_request *obj_req =
2599            container_of(ex, struct rbd_obj_request, ex);
2600        struct ceph_bio_iter *it = arg;
2601
2602        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2603        obj_req->bio_pos = *it;
2604        ceph_bio_iter_advance(it, bytes);
2605}
2606
2607static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2608{
2609        struct rbd_obj_request *obj_req =
2610            container_of(ex, struct rbd_obj_request, ex);
2611        struct ceph_bio_iter *it = arg;
2612
2613        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2614        ceph_bio_iter_advance_step(it, bytes, ({
2615                obj_req->bvec_count++;
2616        }));
2617
2618}
2619
2620static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2621{
2622        struct rbd_obj_request *obj_req =
2623            container_of(ex, struct rbd_obj_request, ex);
2624        struct ceph_bio_iter *it = arg;
2625
2626        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2627        ceph_bio_iter_advance_step(it, bytes, ({
2628                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2629                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2630        }));
2631}
2632
2633static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2634                                   struct ceph_file_extent *img_extents,
2635                                   u32 num_img_extents,
2636                                   struct ceph_bio_iter *bio_pos)
2637{
2638        struct rbd_img_fill_ctx fctx = {
2639                .pos_type = OBJ_REQUEST_BIO,
2640                .pos = (union rbd_img_fill_iter *)bio_pos,
2641                .set_pos_fn = set_bio_pos,
2642                .count_fn = count_bio_bvecs,
2643                .copy_fn = copy_bio_bvecs,
2644        };
2645
2646        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2647                                    &fctx);
2648}
2649
2650static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2651                                 u64 off, u64 len, struct bio *bio)
2652{
2653        struct ceph_file_extent ex = { off, len };
2654        struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2655
2656        return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2657}
2658
2659static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2660{
2661        struct rbd_obj_request *obj_req =
2662            container_of(ex, struct rbd_obj_request, ex);
2663        struct ceph_bvec_iter *it = arg;
2664
2665        obj_req->bvec_pos = *it;
2666        ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2667        ceph_bvec_iter_advance(it, bytes);
2668}
2669
2670static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2671{
2672        struct rbd_obj_request *obj_req =
2673            container_of(ex, struct rbd_obj_request, ex);
2674        struct ceph_bvec_iter *it = arg;
2675
2676        ceph_bvec_iter_advance_step(it, bytes, ({
2677                obj_req->bvec_count++;
2678        }));
2679}
2680
2681static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2682{
2683        struct rbd_obj_request *obj_req =
2684            container_of(ex, struct rbd_obj_request, ex);
2685        struct ceph_bvec_iter *it = arg;
2686
2687        ceph_bvec_iter_advance_step(it, bytes, ({
2688                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2689                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2690        }));
2691}
2692
2693static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2694                                     struct ceph_file_extent *img_extents,
2695                                     u32 num_img_extents,
2696                                     struct ceph_bvec_iter *bvec_pos)
2697{
2698        struct rbd_img_fill_ctx fctx = {
2699                .pos_type = OBJ_REQUEST_BVECS,
2700                .pos = (union rbd_img_fill_iter *)bvec_pos,
2701                .set_pos_fn = set_bvec_pos,
2702                .count_fn = count_bvecs,
2703                .copy_fn = copy_bvecs,
2704        };
2705
2706        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2707                                    &fctx);
2708}
2709
2710static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2711                                   struct ceph_file_extent *img_extents,
2712                                   u32 num_img_extents,
2713                                   struct bio_vec *bvecs)
2714{
2715        struct ceph_bvec_iter it = {
2716                .bvecs = bvecs,
2717                .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2718                                                             num_img_extents) },
2719        };
2720
2721        return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2722                                         &it);
2723}
2724
2725static void rbd_img_handle_request_work(struct work_struct *work)
2726{
2727        struct rbd_img_request *img_req =
2728            container_of(work, struct rbd_img_request, work);
2729
2730        rbd_img_handle_request(img_req, img_req->work_result);
2731}
2732
2733static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2734{
2735        INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2736        img_req->work_result = result;
2737        queue_work(rbd_wq, &img_req->work);
2738}
2739
2740static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2741{
2742        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2743
2744        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2745                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2746                return true;
2747        }
2748
2749        dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2750             obj_req->ex.oe_objno);
2751        return false;
2752}
2753
2754static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2755{
2756        struct ceph_osd_request *osd_req;
2757        int ret;
2758
2759        osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2760        if (IS_ERR(osd_req))
2761                return PTR_ERR(osd_req);
2762
2763        osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2764                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2765        rbd_osd_setup_data(osd_req, 0);
2766        rbd_osd_format_read(osd_req);
2767
2768        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2769        if (ret)
2770                return ret;
2771
2772        rbd_osd_submit(osd_req);
2773        return 0;
2774}
2775
2776static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2777{
2778        struct rbd_img_request *img_req = obj_req->img_request;
2779        struct rbd_device *parent = img_req->rbd_dev->parent;
2780        struct rbd_img_request *child_img_req;
2781        int ret;
2782
2783        child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2784        if (!child_img_req)
2785                return -ENOMEM;
2786
2787        rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2788        __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2789        child_img_req->obj_request = obj_req;
2790
2791        down_read(&parent->header_rwsem);
2792        rbd_img_capture_header(child_img_req);
2793        up_read(&parent->header_rwsem);
2794
2795        dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2796             obj_req);
2797
2798        if (!rbd_img_is_write(img_req)) {
2799                switch (img_req->data_type) {
2800                case OBJ_REQUEST_BIO:
2801                        ret = __rbd_img_fill_from_bio(child_img_req,
2802                                                      obj_req->img_extents,
2803                                                      obj_req->num_img_extents,
2804                                                      &obj_req->bio_pos);
2805                        break;
2806                case OBJ_REQUEST_BVECS:
2807                case OBJ_REQUEST_OWN_BVECS:
2808                        ret = __rbd_img_fill_from_bvecs(child_img_req,
2809                                                      obj_req->img_extents,
2810                                                      obj_req->num_img_extents,
2811                                                      &obj_req->bvec_pos);
2812                        break;
2813                default:
2814                        BUG();
2815                }
2816        } else {
2817                ret = rbd_img_fill_from_bvecs(child_img_req,
2818                                              obj_req->img_extents,
2819                                              obj_req->num_img_extents,
2820                                              obj_req->copyup_bvecs);
2821        }
2822        if (ret) {
2823                rbd_img_request_destroy(child_img_req);
2824                return ret;
2825        }
2826
2827        /* avoid parent chain recursion */
2828        rbd_img_schedule(child_img_req, 0);
2829        return 0;
2830}
2831
2832static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2833{
2834        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2835        int ret;
2836
2837again:
2838        switch (obj_req->read_state) {
2839        case RBD_OBJ_READ_START:
2840                rbd_assert(!*result);
2841
2842                if (!rbd_obj_may_exist(obj_req)) {
2843                        *result = -ENOENT;
2844                        obj_req->read_state = RBD_OBJ_READ_OBJECT;
2845                        goto again;
2846                }
2847
2848                ret = rbd_obj_read_object(obj_req);
2849                if (ret) {
2850                        *result = ret;
2851                        return true;
2852                }
2853                obj_req->read_state = RBD_OBJ_READ_OBJECT;
2854                return false;
2855        case RBD_OBJ_READ_OBJECT:
2856                if (*result == -ENOENT && rbd_dev->parent_overlap) {
2857                        /* reverse map this object extent onto the parent */
2858                        ret = rbd_obj_calc_img_extents(obj_req, false);
2859                        if (ret) {
2860                                *result = ret;
2861                                return true;
2862                        }
2863                        if (obj_req->num_img_extents) {
2864                                ret = rbd_obj_read_from_parent(obj_req);
2865                                if (ret) {
2866                                        *result = ret;
2867                                        return true;
2868                                }
2869                                obj_req->read_state = RBD_OBJ_READ_PARENT;
2870                                return false;
2871                        }
2872                }
2873
2874                /*
2875                 * -ENOENT means a hole in the image -- zero-fill the entire
2876                 * length of the request.  A short read also implies zero-fill
2877                 * to the end of the request.
2878                 */
2879                if (*result == -ENOENT) {
2880                        rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2881                        *result = 0;
2882                } else if (*result >= 0) {
2883                        if (*result < obj_req->ex.oe_len)
2884                                rbd_obj_zero_range(obj_req, *result,
2885                                                obj_req->ex.oe_len - *result);
2886                        else
2887                                rbd_assert(*result == obj_req->ex.oe_len);
2888                        *result = 0;
2889                }
2890                return true;
2891        case RBD_OBJ_READ_PARENT:
2892                /*
2893                 * The parent image is read only up to the overlap -- zero-fill
2894                 * from the overlap to the end of the request.
2895                 */
2896                if (!*result) {
2897                        u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2898
2899                        if (obj_overlap < obj_req->ex.oe_len)
2900                                rbd_obj_zero_range(obj_req, obj_overlap,
2901                                            obj_req->ex.oe_len - obj_overlap);
2902                }
2903                return true;
2904        default:
2905                BUG();
2906        }
2907}
2908
2909static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2910{
2911        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2912
2913        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2914                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2915
2916        if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2917            (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2918                dout("%s %p noop for nonexistent\n", __func__, obj_req);
2919                return true;
2920        }
2921
2922        return false;
2923}
2924
2925/*
2926 * Return:
2927 *   0 - object map update sent
2928 *   1 - object map update isn't needed
2929 *  <0 - error
2930 */
2931static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2932{
2933        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2934        u8 new_state;
2935
2936        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2937                return 1;
2938
2939        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2940                new_state = OBJECT_PENDING;
2941        else
2942                new_state = OBJECT_EXISTS;
2943
2944        return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2945}
2946
2947static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2948{
2949        struct ceph_osd_request *osd_req;
2950        int num_ops = count_write_ops(obj_req);
2951        int which = 0;
2952        int ret;
2953
2954        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2955                num_ops++; /* stat */
2956
2957        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2958        if (IS_ERR(osd_req))
2959                return PTR_ERR(osd_req);
2960
2961        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2962                ret = rbd_osd_setup_stat(osd_req, which++);
2963                if (ret)
2964                        return ret;
2965        }
2966
2967        rbd_osd_setup_write_ops(osd_req, which);
2968        rbd_osd_format_write(osd_req);
2969
2970        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2971        if (ret)
2972                return ret;
2973
2974        rbd_osd_submit(osd_req);
2975        return 0;
2976}
2977
2978/*
2979 * copyup_bvecs pages are never highmem pages
2980 */
2981static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2982{
2983        struct ceph_bvec_iter it = {
2984                .bvecs = bvecs,
2985                .iter = { .bi_size = bytes },
2986        };
2987
2988        ceph_bvec_iter_advance_step(&it, bytes, ({
2989                if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
2990                        return false;
2991        }));
2992        return true;
2993}
2994
2995#define MODS_ONLY       U32_MAX
2996
2997static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
2998                                      u32 bytes)
2999{
3000        struct ceph_osd_request *osd_req;
3001        int ret;
3002
3003        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3004        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3005
3006        osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3007        if (IS_ERR(osd_req))
3008                return PTR_ERR(osd_req);
3009
3010        ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3011        if (ret)
3012                return ret;
3013
3014        rbd_osd_format_write(osd_req);
3015
3016        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3017        if (ret)
3018                return ret;
3019
3020        rbd_osd_submit(osd_req);
3021        return 0;
3022}
3023
3024static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3025                                        u32 bytes)
3026{
3027        struct ceph_osd_request *osd_req;
3028        int num_ops = count_write_ops(obj_req);
3029        int which = 0;
3030        int ret;
3031
3032        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3033
3034        if (bytes != MODS_ONLY)
3035                num_ops++; /* copyup */
3036
3037        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3038        if (IS_ERR(osd_req))
3039                return PTR_ERR(osd_req);
3040
3041        if (bytes != MODS_ONLY) {
3042                ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3043                if (ret)
3044                        return ret;
3045        }
3046
3047        rbd_osd_setup_write_ops(osd_req, which);
3048        rbd_osd_format_write(osd_req);
3049
3050        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3051        if (ret)
3052                return ret;
3053
3054        rbd_osd_submit(osd_req);
3055        return 0;
3056}
3057
3058static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3059{
3060        u32 i;
3061
3062        rbd_assert(!obj_req->copyup_bvecs);
3063        obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3064        obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3065                                        sizeof(*obj_req->copyup_bvecs),
3066                                        GFP_NOIO);
3067        if (!obj_req->copyup_bvecs)
3068                return -ENOMEM;
3069
3070        for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3071                unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3072
3073                obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3074                if (!obj_req->copyup_bvecs[i].bv_page)
3075                        return -ENOMEM;
3076
3077                obj_req->copyup_bvecs[i].bv_offset = 0;
3078                obj_req->copyup_bvecs[i].bv_len = len;
3079                obj_overlap -= len;
3080        }
3081
3082        rbd_assert(!obj_overlap);
3083        return 0;
3084}
3085
3086/*
3087 * The target object doesn't exist.  Read the data for the entire
3088 * target object up to the overlap point (if any) from the parent,
3089 * so we can use it for a copyup.
3090 */
3091static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3092{
3093        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3094        int ret;
3095
3096        rbd_assert(obj_req->num_img_extents);
3097        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3098                      rbd_dev->parent_overlap);
3099        if (!obj_req->num_img_extents) {
3100                /*
3101                 * The overlap has become 0 (most likely because the
3102                 * image has been flattened).  Re-submit the original write
3103                 * request -- pass MODS_ONLY since the copyup isn't needed
3104                 * anymore.
3105                 */
3106                return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3107        }
3108
3109        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3110        if (ret)
3111                return ret;
3112
3113        return rbd_obj_read_from_parent(obj_req);
3114}
3115
3116static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3117{
3118        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3119        struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3120        u8 new_state;
3121        u32 i;
3122        int ret;
3123
3124        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3125
3126        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3127                return;
3128
3129        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3130                return;
3131
3132        for (i = 0; i < snapc->num_snaps; i++) {
3133                if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3134                    i + 1 < snapc->num_snaps)
3135                        new_state = OBJECT_EXISTS_CLEAN;
3136                else
3137                        new_state = OBJECT_EXISTS;
3138
3139                ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3140                                            new_state, NULL);
3141                if (ret < 0) {
3142                        obj_req->pending.result = ret;
3143                        return;
3144                }
3145
3146                rbd_assert(!ret);
3147                obj_req->pending.num_pending++;
3148        }
3149}
3150
3151static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3152{
3153        u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3154        int ret;
3155
3156        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3157
3158        /*
3159         * Only send non-zero copyup data to save some I/O and network
3160         * bandwidth -- zero copyup data is equivalent to the object not
3161         * existing.
3162         */
3163        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3164                bytes = 0;
3165
3166        if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3167                /*
3168                 * Send a copyup request with an empty snapshot context to
3169                 * deep-copyup the object through all existing snapshots.
3170                 * A second request with the current snapshot context will be
3171                 * sent for the actual modification.
3172                 */
3173                ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3174                if (ret) {
3175                        obj_req->pending.result = ret;
3176                        return;
3177                }
3178
3179                obj_req->pending.num_pending++;
3180                bytes = MODS_ONLY;
3181        }
3182
3183        ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3184        if (ret) {
3185                obj_req->pending.result = ret;
3186                return;
3187        }
3188
3189        obj_req->pending.num_pending++;
3190}
3191
3192static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3193{
3194        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3195        int ret;
3196
3197again:
3198        switch (obj_req->copyup_state) {
3199        case RBD_OBJ_COPYUP_START:
3200                rbd_assert(!*result);
3201
3202                ret = rbd_obj_copyup_read_parent(obj_req);
3203                if (ret) {
3204                        *result = ret;
3205                        return true;
3206                }
3207                if (obj_req->num_img_extents)
3208                        obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3209                else
3210                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3211                return false;
3212        case RBD_OBJ_COPYUP_READ_PARENT:
3213                if (*result)
3214                        return true;
3215
3216                if (is_zero_bvecs(obj_req->copyup_bvecs,
3217                                  rbd_obj_img_extents_bytes(obj_req))) {
3218                        dout("%s %p detected zeros\n", __func__, obj_req);
3219                        obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3220                }
3221
3222                rbd_obj_copyup_object_maps(obj_req);
3223                if (!obj_req->pending.num_pending) {
3224                        *result = obj_req->pending.result;
3225                        obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3226                        goto again;
3227                }
3228                obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3229                return false;
3230        case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3231                if (!pending_result_dec(&obj_req->pending, result))
3232                        return false;
3233                fallthrough;
3234        case RBD_OBJ_COPYUP_OBJECT_MAPS:
3235                if (*result) {
3236                        rbd_warn(rbd_dev, "snap object map update failed: %d",
3237                                 *result);
3238                        return true;
3239                }
3240
3241                rbd_obj_copyup_write_object(obj_req);
3242                if (!obj_req->pending.num_pending) {
3243                        *result = obj_req->pending.result;
3244                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3245                        goto again;
3246                }
3247                obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3248                return false;
3249        case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3250                if (!pending_result_dec(&obj_req->pending, result))
3251                        return false;
3252                fallthrough;
3253        case RBD_OBJ_COPYUP_WRITE_OBJECT:
3254                return true;
3255        default:
3256                BUG();
3257        }
3258}
3259
3260/*
3261 * Return:
3262 *   0 - object map update sent
3263 *   1 - object map update isn't needed
3264 *  <0 - error
3265 */
3266static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3267{
3268        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3269        u8 current_state = OBJECT_PENDING;
3270
3271        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3272                return 1;
3273
3274        if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3275                return 1;
3276
3277        return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3278                                     &current_state);
3279}
3280
3281static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3282{
3283        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3284        int ret;
3285
3286again:
3287        switch (obj_req->write_state) {
3288        case RBD_OBJ_WRITE_START:
3289                rbd_assert(!*result);
3290
3291                if (rbd_obj_write_is_noop(obj_req))
3292                        return true;
3293
3294                ret = rbd_obj_write_pre_object_map(obj_req);
3295                if (ret < 0) {
3296                        *result = ret;
3297                        return true;
3298                }
3299                obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3300                if (ret > 0)
3301                        goto again;
3302                return false;
3303        case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3304                if (*result) {
3305                        rbd_warn(rbd_dev, "pre object map update failed: %d",
3306                                 *result);
3307                        return true;
3308                }
3309                ret = rbd_obj_write_object(obj_req);
3310                if (ret) {
3311                        *result = ret;
3312                        return true;
3313                }
3314                obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3315                return false;
3316        case RBD_OBJ_WRITE_OBJECT:
3317                if (*result == -ENOENT) {
3318                        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3319                                *result = 0;
3320                                obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3321                                obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3322                                goto again;
3323                        }
3324                        /*
3325                         * On a non-existent object:
3326                         *   delete - -ENOENT, truncate/zero - 0
3327                         */
3328                        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3329                                *result = 0;
3330                }
3331                if (*result)
3332                        return true;
3333
3334                obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3335                goto again;
3336        case __RBD_OBJ_WRITE_COPYUP:
3337                if (!rbd_obj_advance_copyup(obj_req, result))
3338                        return false;
3339                fallthrough;
3340        case RBD_OBJ_WRITE_COPYUP:
3341                if (*result) {
3342                        rbd_warn(rbd_dev, "copyup failed: %d", *result);
3343                        return true;
3344                }
3345                ret = rbd_obj_write_post_object_map(obj_req);
3346                if (ret < 0) {
3347                        *result = ret;
3348                        return true;
3349                }
3350                obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3351                if (ret > 0)
3352                        goto again;
3353                return false;
3354        case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3355                if (*result)
3356                        rbd_warn(rbd_dev, "post object map update failed: %d",
3357                                 *result);
3358                return true;
3359        default:
3360                BUG();
3361        }
3362}
3363
3364/*
3365 * Return true if @obj_req is completed.
3366 */
3367static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3368                                     int *result)
3369{
3370        struct rbd_img_request *img_req = obj_req->img_request;
3371        struct rbd_device *rbd_dev = img_req->rbd_dev;
3372        bool done;
3373
3374        mutex_lock(&obj_req->state_mutex);
3375        if (!rbd_img_is_write(img_req))
3376                done = rbd_obj_advance_read(obj_req, result);
3377        else
3378                done = rbd_obj_advance_write(obj_req, result);
3379        mutex_unlock(&obj_req->state_mutex);
3380
3381        if (done && *result) {
3382                rbd_assert(*result < 0);
3383                rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3384                         obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3385                         obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3386        }
3387        return done;
3388}
3389
3390/*
3391 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3392 * recursion.
3393 */
3394static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3395{
3396        if (__rbd_obj_handle_request(obj_req, &result))
3397                rbd_img_handle_request(obj_req->img_request, result);
3398}
3399
3400static bool need_exclusive_lock(struct rbd_img_request *img_req)
3401{
3402        struct rbd_device *rbd_dev = img_req->rbd_dev;
3403
3404        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3405                return false;
3406
3407        if (rbd_is_ro(rbd_dev))
3408                return false;
3409
3410        rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3411        if (rbd_dev->opts->lock_on_read ||
3412            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3413                return true;
3414
3415        return rbd_img_is_write(img_req);
3416}
3417
3418static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3419{
3420        struct rbd_device *rbd_dev = img_req->rbd_dev;
3421        bool locked;
3422
3423        lockdep_assert_held(&rbd_dev->lock_rwsem);
3424        locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3425        spin_lock(&rbd_dev->lock_lists_lock);
3426        rbd_assert(list_empty(&img_req->lock_item));
3427        if (!locked)
3428                list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3429        else
3430                list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3431        spin_unlock(&rbd_dev->lock_lists_lock);
3432        return locked;
3433}
3434
3435static void rbd_lock_del_request(struct rbd_img_request *img_req)
3436{
3437        struct rbd_device *rbd_dev = img_req->rbd_dev;
3438        bool need_wakeup;
3439
3440        lockdep_assert_held(&rbd_dev->lock_rwsem);
3441        spin_lock(&rbd_dev->lock_lists_lock);
3442        rbd_assert(!list_empty(&img_req->lock_item));
3443        list_del_init(&img_req->lock_item);
3444        need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3445                       list_empty(&rbd_dev->running_list));
3446        spin_unlock(&rbd_dev->lock_lists_lock);
3447        if (need_wakeup)
3448                complete(&rbd_dev->releasing_wait);
3449}
3450
3451static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3452{
3453        struct rbd_device *rbd_dev = img_req->rbd_dev;
3454
3455        if (!need_exclusive_lock(img_req))
3456                return 1;
3457
3458        if (rbd_lock_add_request(img_req))
3459                return 1;
3460
3461        if (rbd_dev->opts->exclusive) {
3462                WARN_ON(1); /* lock got released? */
3463                return -EROFS;
3464        }
3465
3466        /*
3467         * Note the use of mod_delayed_work() in rbd_acquire_lock()
3468         * and cancel_delayed_work() in wake_lock_waiters().
3469         */
3470        dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3471        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3472        return 0;
3473}
3474
3475static void rbd_img_object_requests(struct rbd_img_request *img_req)
3476{
3477        struct rbd_obj_request *obj_req;
3478
3479        rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3480
3481        for_each_obj_request(img_req, obj_req) {
3482                int result = 0;
3483
3484                if (__rbd_obj_handle_request(obj_req, &result)) {
3485                        if (result) {
3486                                img_req->pending.result = result;
3487                                return;
3488                        }
3489                } else {
3490                        img_req->pending.num_pending++;
3491                }
3492        }
3493}
3494
3495static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3496{
3497        struct rbd_device *rbd_dev = img_req->rbd_dev;
3498        int ret;
3499
3500again:
3501        switch (img_req->state) {
3502        case RBD_IMG_START:
3503                rbd_assert(!*result);
3504
3505                ret = rbd_img_exclusive_lock(img_req);
3506                if (ret < 0) {
3507                        *result = ret;
3508                        return true;
3509                }
3510                img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3511                if (ret > 0)
3512                        goto again;
3513                return false;
3514        case RBD_IMG_EXCLUSIVE_LOCK:
3515                if (*result)
3516                        return true;
3517
3518                rbd_assert(!need_exclusive_lock(img_req) ||
3519                           __rbd_is_lock_owner(rbd_dev));
3520
3521                rbd_img_object_requests(img_req);
3522                if (!img_req->pending.num_pending) {
3523                        *result = img_req->pending.result;
3524                        img_req->state = RBD_IMG_OBJECT_REQUESTS;
3525                        goto again;
3526                }
3527                img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3528                return false;
3529        case __RBD_IMG_OBJECT_REQUESTS:
3530                if (!pending_result_dec(&img_req->pending, result))
3531                        return false;
3532                fallthrough;
3533        case RBD_IMG_OBJECT_REQUESTS:
3534                return true;
3535        default:
3536                BUG();
3537        }
3538}
3539
3540/*
3541 * Return true if @img_req is completed.
3542 */
3543static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3544                                     int *result)
3545{
3546        struct rbd_device *rbd_dev = img_req->rbd_dev;
3547        bool done;
3548
3549        if (need_exclusive_lock(img_req)) {
3550                down_read(&rbd_dev->lock_rwsem);
3551                mutex_lock(&img_req->state_mutex);
3552                done = rbd_img_advance(img_req, result);
3553                if (done)
3554                        rbd_lock_del_request(img_req);
3555                mutex_unlock(&img_req->state_mutex);
3556                up_read(&rbd_dev->lock_rwsem);
3557        } else {
3558                mutex_lock(&img_req->state_mutex);
3559                done = rbd_img_advance(img_req, result);
3560                mutex_unlock(&img_req->state_mutex);
3561        }
3562
3563        if (done && *result) {
3564                rbd_assert(*result < 0);
3565                rbd_warn(rbd_dev, "%s%s result %d",
3566                      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3567                      obj_op_name(img_req->op_type), *result);
3568        }
3569        return done;
3570}
3571
3572static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3573{
3574again:
3575        if (!__rbd_img_handle_request(img_req, &result))
3576                return;
3577
3578        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3579                struct rbd_obj_request *obj_req = img_req->obj_request;
3580
3581                rbd_img_request_destroy(img_req);
3582                if (__rbd_obj_handle_request(obj_req, &result)) {
3583                        img_req = obj_req->img_request;
3584                        goto again;
3585                }
3586        } else {
3587                struct request *rq = blk_mq_rq_from_pdu(img_req);
3588
3589                rbd_img_request_destroy(img_req);
3590                blk_mq_end_request(rq, errno_to_blk_status(result));
3591        }
3592}
3593
3594static const struct rbd_client_id rbd_empty_cid;
3595
3596static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3597                          const struct rbd_client_id *rhs)
3598{
3599        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3600}
3601
3602static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3603{
3604        struct rbd_client_id cid;
3605
3606        mutex_lock(&rbd_dev->watch_mutex);
3607        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3608        cid.handle = rbd_dev->watch_cookie;
3609        mutex_unlock(&rbd_dev->watch_mutex);
3610        return cid;
3611}
3612
3613/*
3614 * lock_rwsem must be held for write
3615 */
3616static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3617                              const struct rbd_client_id *cid)
3618{
3619        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3620             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3621             cid->gid, cid->handle);
3622        rbd_dev->owner_cid = *cid; /* struct */
3623}
3624
3625static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3626{
3627        mutex_lock(&rbd_dev->watch_mutex);
3628        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3629        mutex_unlock(&rbd_dev->watch_mutex);
3630}
3631
3632static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3633{
3634        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3635
3636        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3637        strcpy(rbd_dev->lock_cookie, cookie);
3638        rbd_set_owner_cid(rbd_dev, &cid);
3639        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3640}
3641
3642/*
3643 * lock_rwsem must be held for write
3644 */
3645static int rbd_lock(struct rbd_device *rbd_dev)
3646{
3647        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3648        char cookie[32];
3649        int ret;
3650
3651        WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3652                rbd_dev->lock_cookie[0] != '\0');
3653
3654        format_lock_cookie(rbd_dev, cookie);
3655        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3656                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3657                            RBD_LOCK_TAG, "", 0);
3658        if (ret)
3659                return ret;
3660
3661        __rbd_lock(rbd_dev, cookie);
3662        return 0;
3663}
3664
3665/*
3666 * lock_rwsem must be held for write
3667 */
3668static void rbd_unlock(struct rbd_device *rbd_dev)
3669{
3670        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3671        int ret;
3672
3673        WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3674                rbd_dev->lock_cookie[0] == '\0');
3675
3676        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3677                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
3678        if (ret && ret != -ENOENT)
3679                rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3680
3681        /* treat errors as the image is unlocked */
3682        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3683        rbd_dev->lock_cookie[0] = '\0';
3684        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3685        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3686}
3687
3688static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3689                                enum rbd_notify_op notify_op,
3690                                struct page ***preply_pages,
3691                                size_t *preply_len)
3692{
3693        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3694        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3695        char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3696        int buf_size = sizeof(buf);
3697        void *p = buf;
3698
3699        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3700
3701        /* encode *LockPayload NotifyMessage (op + ClientId) */
3702        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3703        ceph_encode_32(&p, notify_op);
3704        ceph_encode_64(&p, cid.gid);
3705        ceph_encode_64(&p, cid.handle);
3706
3707        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3708                                &rbd_dev->header_oloc, buf, buf_size,
3709                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3710}
3711
3712static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3713                               enum rbd_notify_op notify_op)
3714{
3715        __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3716}
3717
3718static void rbd_notify_acquired_lock(struct work_struct *work)
3719{
3720        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3721                                                  acquired_lock_work);
3722
3723        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3724}
3725
3726static void rbd_notify_released_lock(struct work_struct *work)
3727{
3728        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3729                                                  released_lock_work);
3730
3731        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3732}
3733
3734static int rbd_request_lock(struct rbd_device *rbd_dev)
3735{
3736        struct page **reply_pages;
3737        size_t reply_len;
3738        bool lock_owner_responded = false;
3739        int ret;
3740
3741        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3742
3743        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3744                                   &reply_pages, &reply_len);
3745        if (ret && ret != -ETIMEDOUT) {
3746                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3747                goto out;
3748        }
3749
3750        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3751                void *p = page_address(reply_pages[0]);
3752                void *const end = p + reply_len;
3753                u32 n;
3754
3755                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3756                while (n--) {
3757                        u8 struct_v;
3758                        u32 len;
3759
3760                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3761                        p += 8 + 8; /* skip gid and cookie */
3762
3763                        ceph_decode_32_safe(&p, end, len, e_inval);
3764                        if (!len)
3765                                continue;
3766
3767                        if (lock_owner_responded) {
3768                                rbd_warn(rbd_dev,
3769                                         "duplicate lock owners detected");
3770                                ret = -EIO;
3771                                goto out;
3772                        }
3773
3774                        lock_owner_responded = true;
3775                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3776                                                  &struct_v, &len);
3777                        if (ret) {
3778                                rbd_warn(rbd_dev,
3779                                         "failed to decode ResponseMessage: %d",
3780                                         ret);
3781                                goto e_inval;
3782                        }
3783
3784                        ret = ceph_decode_32(&p);
3785                }
3786        }
3787
3788        if (!lock_owner_responded) {
3789                rbd_warn(rbd_dev, "no lock owners detected");
3790                ret = -ETIMEDOUT;
3791        }
3792
3793out:
3794        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3795        return ret;
3796
3797e_inval:
3798        ret = -EINVAL;
3799        goto out;
3800}
3801
3802/*
3803 * Either image request state machine(s) or rbd_add_acquire_lock()
3804 * (i.e. "rbd map").
3805 */
3806static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3807{
3808        struct rbd_img_request *img_req;
3809
3810        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3811        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3812
3813        cancel_delayed_work(&rbd_dev->lock_dwork);
3814        if (!completion_done(&rbd_dev->acquire_wait)) {
3815                rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3816                           list_empty(&rbd_dev->running_list));
3817                rbd_dev->acquire_err = result;
3818                complete_all(&rbd_dev->acquire_wait);
3819                return;
3820        }
3821
3822        list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3823                mutex_lock(&img_req->state_mutex);
3824                rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3825                rbd_img_schedule(img_req, result);
3826                mutex_unlock(&img_req->state_mutex);
3827        }
3828
3829        list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3830}
3831
3832static int get_lock_owner_info(struct rbd_device *rbd_dev,
3833                               struct ceph_locker **lockers, u32 *num_lockers)
3834{
3835        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3836        u8 lock_type;
3837        char *lock_tag;
3838        int ret;
3839
3840        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3841
3842        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3843                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3844                                 &lock_type, &lock_tag, lockers, num_lockers);
3845        if (ret)
3846                return ret;
3847
3848        if (*num_lockers == 0) {
3849                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3850                goto out;
3851        }
3852
3853        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3854                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3855                         lock_tag);
3856                ret = -EBUSY;
3857                goto out;
3858        }
3859
3860        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3861                rbd_warn(rbd_dev, "shared lock type detected");
3862                ret = -EBUSY;
3863                goto out;
3864        }
3865
3866        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3867                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3868                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3869                         (*lockers)[0].id.cookie);
3870                ret = -EBUSY;
3871                goto out;
3872        }
3873
3874out:
3875        kfree(lock_tag);
3876        return ret;
3877}
3878
3879static int find_watcher(struct rbd_device *rbd_dev,
3880                        const struct ceph_locker *locker)
3881{
3882        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3883        struct ceph_watch_item *watchers;
3884        u32 num_watchers;
3885        u64 cookie;
3886        int i;
3887        int ret;
3888
3889        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3890                                      &rbd_dev->header_oloc, &watchers,
3891                                      &num_watchers);
3892        if (ret)
3893                return ret;
3894
3895        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3896        for (i = 0; i < num_watchers; i++) {
3897                /*
3898                 * Ignore addr->type while comparing.  This mimics
3899                 * entity_addr_t::get_legacy_str() + strcmp().
3900                 */
3901                if (ceph_addr_equal_no_type(&watchers[i].addr,
3902                                            &locker->info.addr) &&
3903                    watchers[i].cookie == cookie) {
3904                        struct rbd_client_id cid = {
3905                                .gid = le64_to_cpu(watchers[i].name.num),
3906                                .handle = cookie,
3907                        };
3908
3909                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3910                             rbd_dev, cid.gid, cid.handle);
3911                        rbd_set_owner_cid(rbd_dev, &cid);
3912                        ret = 1;
3913                        goto out;
3914                }
3915        }
3916
3917        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3918        ret = 0;
3919out:
3920        kfree(watchers);
3921        return ret;
3922}
3923
3924/*
3925 * lock_rwsem must be held for write
3926 */
3927static int rbd_try_lock(struct rbd_device *rbd_dev)
3928{
3929        struct ceph_client *client = rbd_dev->rbd_client->client;
3930        struct ceph_locker *lockers;
3931        u32 num_lockers;
3932        int ret;
3933
3934        for (;;) {
3935                ret = rbd_lock(rbd_dev);
3936                if (ret != -EBUSY)
3937                        return ret;
3938
3939                /* determine if the current lock holder is still alive */
3940                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3941                if (ret)
3942                        return ret;
3943
3944                if (num_lockers == 0)
3945                        goto again;
3946
3947                ret = find_watcher(rbd_dev, lockers);
3948                if (ret)
3949                        goto out; /* request lock or error */
3950
3951                rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3952                         ENTITY_NAME(lockers[0].id.name));
3953
3954                ret = ceph_monc_blocklist_add(&client->monc,
3955                                              &lockers[0].info.addr);
3956                if (ret) {
3957                        rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3958                                 ENTITY_NAME(lockers[0].id.name), ret);
3959                        goto out;
3960                }
3961
3962                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3963                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
3964                                          lockers[0].id.cookie,
3965                                          &lockers[0].id.name);
3966                if (ret && ret != -ENOENT)
3967                        goto out;
3968
3969again:
3970                ceph_free_lockers(lockers, num_lockers);
3971        }
3972
3973out:
3974        ceph_free_lockers(lockers, num_lockers);
3975        return ret;
3976}
3977
3978static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3979{
3980        int ret;
3981
3982        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
3983                ret = rbd_object_map_open(rbd_dev);
3984                if (ret)
3985                        return ret;
3986        }
3987
3988        return 0;
3989}
3990
3991/*
3992 * Return:
3993 *   0 - lock acquired
3994 *   1 - caller should call rbd_request_lock()
3995 *  <0 - error
3996 */
3997static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
3998{
3999        int ret;
4000
4001        down_read(&rbd_dev->lock_rwsem);
4002        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4003             rbd_dev->lock_state);
4004        if (__rbd_is_lock_owner(rbd_dev)) {
4005                up_read(&rbd_dev->lock_rwsem);
4006                return 0;
4007        }
4008
4009        up_read(&rbd_dev->lock_rwsem);
4010        down_write(&rbd_dev->lock_rwsem);
4011        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4012             rbd_dev->lock_state);
4013        if (__rbd_is_lock_owner(rbd_dev)) {
4014                up_write(&rbd_dev->lock_rwsem);
4015                return 0;
4016        }
4017
4018        ret = rbd_try_lock(rbd_dev);
4019        if (ret < 0) {
4020                rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4021                if (ret == -EBLOCKLISTED)
4022                        goto out;
4023
4024                ret = 1; /* request lock anyway */
4025        }
4026        if (ret > 0) {
4027                up_write(&rbd_dev->lock_rwsem);
4028                return ret;
4029        }
4030
4031        rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4032        rbd_assert(list_empty(&rbd_dev->running_list));
4033
4034        ret = rbd_post_acquire_action(rbd_dev);
4035        if (ret) {
4036                rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4037                /*
4038                 * Can't stay in RBD_LOCK_STATE_LOCKED because
4039                 * rbd_lock_add_request() would let the request through,
4040                 * assuming that e.g. object map is locked and loaded.
4041                 */
4042                rbd_unlock(rbd_dev);
4043        }
4044
4045out:
4046        wake_lock_waiters(rbd_dev, ret);
4047        up_write(&rbd_dev->lock_rwsem);
4048        return ret;
4049}
4050
4051static void rbd_acquire_lock(struct work_struct *work)
4052{
4053        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4054                                            struct rbd_device, lock_dwork);
4055        int ret;
4056
4057        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4058again:
4059        ret = rbd_try_acquire_lock(rbd_dev);
4060        if (ret <= 0) {
4061                dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4062                return;
4063        }
4064
4065        ret = rbd_request_lock(rbd_dev);
4066        if (ret == -ETIMEDOUT) {
4067                goto again; /* treat this as a dead client */
4068        } else if (ret == -EROFS) {
4069                rbd_warn(rbd_dev, "peer will not release lock");
4070                down_write(&rbd_dev->lock_rwsem);
4071                wake_lock_waiters(rbd_dev, ret);
4072                up_write(&rbd_dev->lock_rwsem);
4073        } else if (ret < 0) {
4074                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4075                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4076                                 RBD_RETRY_DELAY);
4077        } else {
4078                /*
4079                 * lock owner acked, but resend if we don't see them
4080                 * release the lock
4081                 */
4082                dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4083                     rbd_dev);
4084                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4085                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4086        }
4087}
4088
4089static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4090{
4091        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4092        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4093
4094        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4095                return false;
4096
4097        /*
4098         * Ensure that all in-flight IO is flushed.
4099         */
4100        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4101        rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4102        if (list_empty(&rbd_dev->running_list))
4103                return true;
4104
4105        up_write(&rbd_dev->lock_rwsem);
4106        wait_for_completion(&rbd_dev->releasing_wait);
4107
4108        down_write(&rbd_dev->lock_rwsem);
4109        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4110                return false;
4111
4112        rbd_assert(list_empty(&rbd_dev->running_list));
4113        return true;
4114}
4115
4116static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4117{
4118        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4119                rbd_object_map_close(rbd_dev);
4120}
4121
4122static void __rbd_release_lock(struct rbd_device *rbd_dev)
4123{
4124        rbd_assert(list_empty(&rbd_dev->running_list));
4125
4126        rbd_pre_release_action(rbd_dev);
4127        rbd_unlock(rbd_dev);
4128}
4129
4130/*
4131 * lock_rwsem must be held for write
4132 */
4133static void rbd_release_lock(struct rbd_device *rbd_dev)
4134{
4135        if (!rbd_quiesce_lock(rbd_dev))
4136                return;
4137
4138        __rbd_release_lock(rbd_dev);
4139
4140        /*
4141         * Give others a chance to grab the lock - we would re-acquire
4142         * almost immediately if we got new IO while draining the running
4143         * list otherwise.  We need to ack our own notifications, so this
4144         * lock_dwork will be requeued from rbd_handle_released_lock() by
4145         * way of maybe_kick_acquire().
4146         */
4147        cancel_delayed_work(&rbd_dev->lock_dwork);
4148}
4149
4150static void rbd_release_lock_work(struct work_struct *work)
4151{
4152        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4153                                                  unlock_work);
4154
4155        down_write(&rbd_dev->lock_rwsem);
4156        rbd_release_lock(rbd_dev);
4157        up_write(&rbd_dev->lock_rwsem);
4158}
4159
4160static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4161{
4162        bool have_requests;
4163
4164        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4165        if (__rbd_is_lock_owner(rbd_dev))
4166                return;
4167
4168        spin_lock(&rbd_dev->lock_lists_lock);
4169        have_requests = !list_empty(&rbd_dev->acquiring_list);
4170        spin_unlock(&rbd_dev->lock_lists_lock);
4171        if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4172                dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4173                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4174        }
4175}
4176
4177static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4178                                     void **p)
4179{
4180        struct rbd_client_id cid = { 0 };
4181
4182        if (struct_v >= 2) {
4183                cid.gid = ceph_decode_64(p);
4184                cid.handle = ceph_decode_64(p);
4185        }
4186
4187        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4188             cid.handle);
4189        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4190                down_write(&rbd_dev->lock_rwsem);
4191                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4192                        dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4193                             __func__, rbd_dev, cid.gid, cid.handle);
4194                } else {
4195                        rbd_set_owner_cid(rbd_dev, &cid);
4196                }
4197                downgrade_write(&rbd_dev->lock_rwsem);
4198        } else {
4199                down_read(&rbd_dev->lock_rwsem);
4200        }
4201
4202        maybe_kick_acquire(rbd_dev);
4203        up_read(&rbd_dev->lock_rwsem);
4204}
4205
4206static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4207                                     void **p)
4208{
4209        struct rbd_client_id cid = { 0 };
4210
4211        if (struct_v >= 2) {
4212                cid.gid = ceph_decode_64(p);
4213                cid.handle = ceph_decode_64(p);
4214        }
4215
4216        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4217             cid.handle);
4218        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4219                down_write(&rbd_dev->lock_rwsem);
4220                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4221                        dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4222                             __func__, rbd_dev, cid.gid, cid.handle,
4223                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4224                } else {
4225                        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4226                }
4227                downgrade_write(&rbd_dev->lock_rwsem);
4228        } else {
4229                down_read(&rbd_dev->lock_rwsem);
4230        }
4231
4232        maybe_kick_acquire(rbd_dev);
4233        up_read(&rbd_dev->lock_rwsem);
4234}
4235
4236/*
4237 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4238 * ResponseMessage is needed.
4239 */
4240static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4241                                   void **p)
4242{
4243        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4244        struct rbd_client_id cid = { 0 };
4245        int result = 1;
4246
4247        if (struct_v >= 2) {
4248                cid.gid = ceph_decode_64(p);
4249                cid.handle = ceph_decode_64(p);
4250        }
4251
4252        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4253             cid.handle);
4254        if (rbd_cid_equal(&cid, &my_cid))
4255                return result;
4256
4257        down_read(&rbd_dev->lock_rwsem);
4258        if (__rbd_is_lock_owner(rbd_dev)) {
4259                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4260                    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4261                        goto out_unlock;
4262
4263                /*
4264                 * encode ResponseMessage(0) so the peer can detect
4265                 * a missing owner
4266                 */
4267                result = 0;
4268
4269                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4270                        if (!rbd_dev->opts->exclusive) {
4271                                dout("%s rbd_dev %p queueing unlock_work\n",
4272                                     __func__, rbd_dev);
4273                                queue_work(rbd_dev->task_wq,
4274                                           &rbd_dev->unlock_work);
4275                        } else {
4276                                /* refuse to release the lock */
4277                                result = -EROFS;
4278                        }
4279                }
4280        }
4281
4282out_unlock:
4283        up_read(&rbd_dev->lock_rwsem);
4284        return result;
4285}
4286
4287static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4288                                     u64 notify_id, u64 cookie, s32 *result)
4289{
4290        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4291        char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4292        int buf_size = sizeof(buf);
4293        int ret;
4294
4295        if (result) {
4296                void *p = buf;
4297
4298                /* encode ResponseMessage */
4299                ceph_start_encoding(&p, 1, 1,
4300                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
4301                ceph_encode_32(&p, *result);
4302        } else {
4303                buf_size = 0;
4304        }
4305
4306        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4307                                   &rbd_dev->header_oloc, notify_id, cookie,
4308                                   buf, buf_size);
4309        if (ret)
4310                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4311}
4312
4313static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4314                                   u64 cookie)
4315{
4316        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4317        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4318}
4319
4320static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4321                                          u64 notify_id, u64 cookie, s32 result)
4322{
4323        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4324        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4325}
4326
4327static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4328                         u64 notifier_id, void *data, size_t data_len)
4329{
4330        struct rbd_device *rbd_dev = arg;
4331        void *p = data;
4332        void *const end = p + data_len;
4333        u8 struct_v = 0;
4334        u32 len;
4335        u32 notify_op;
4336        int ret;
4337
4338        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4339             __func__, rbd_dev, cookie, notify_id, data_len);
4340        if (data_len) {
4341                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4342                                          &struct_v, &len);
4343                if (ret) {
4344                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4345                                 ret);
4346                        return;
4347                }
4348
4349                notify_op = ceph_decode_32(&p);
4350        } else {
4351                /* legacy notification for header updates */
4352                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4353                len = 0;
4354        }
4355
4356        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4357        switch (notify_op) {
4358        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4359                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4360                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4361                break;
4362        case RBD_NOTIFY_OP_RELEASED_LOCK:
4363                rbd_handle_released_lock(rbd_dev, struct_v, &p);
4364                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4365                break;
4366        case RBD_NOTIFY_OP_REQUEST_LOCK:
4367                ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4368                if (ret <= 0)
4369                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4370                                                      cookie, ret);
4371                else
4372                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4373                break;
4374        case RBD_NOTIFY_OP_HEADER_UPDATE:
4375                ret = rbd_dev_refresh(rbd_dev);
4376                if (ret)
4377                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
4378
4379                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4380                break;
4381        default:
4382                if (rbd_is_lock_owner(rbd_dev))
4383                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4384                                                      cookie, -EOPNOTSUPP);
4385                else
4386                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4387                break;
4388        }
4389}
4390
4391static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4392
4393static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4394{
4395        struct rbd_device *rbd_dev = arg;
4396
4397        rbd_warn(rbd_dev, "encountered watch error: %d", err);
4398
4399        down_write(&rbd_dev->lock_rwsem);
4400        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4401        up_write(&rbd_dev->lock_rwsem);
4402
4403        mutex_lock(&rbd_dev->watch_mutex);
4404        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4405                __rbd_unregister_watch(rbd_dev);
4406                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4407
4408                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4409        }
4410        mutex_unlock(&rbd_dev->watch_mutex);
4411}
4412
4413/*
4414 * watch_mutex must be locked
4415 */
4416static int __rbd_register_watch(struct rbd_device *rbd_dev)
4417{
4418        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4419        struct ceph_osd_linger_request *handle;
4420
4421        rbd_assert(!rbd_dev->watch_handle);
4422        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4423
4424        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4425                                 &rbd_dev->header_oloc, rbd_watch_cb,
4426                                 rbd_watch_errcb, rbd_dev);
4427        if (IS_ERR(handle))
4428                return PTR_ERR(handle);
4429
4430        rbd_dev->watch_handle = handle;
4431        return 0;
4432}
4433
4434/*
4435 * watch_mutex must be locked
4436 */
4437static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4438{
4439        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4440        int ret;
4441
4442        rbd_assert(rbd_dev->watch_handle);
4443        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4444
4445        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4446        if (ret)
4447                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4448
4449        rbd_dev->watch_handle = NULL;
4450}
4451
4452static int rbd_register_watch(struct rbd_device *rbd_dev)
4453{
4454        int ret;
4455
4456        mutex_lock(&rbd_dev->watch_mutex);
4457        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4458        ret = __rbd_register_watch(rbd_dev);
4459        if (ret)
4460                goto out;
4461
4462        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4463        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4464
4465out:
4466        mutex_unlock(&rbd_dev->watch_mutex);
4467        return ret;
4468}
4469
4470static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4471{
4472        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4473
4474        cancel_work_sync(&rbd_dev->acquired_lock_work);
4475        cancel_work_sync(&rbd_dev->released_lock_work);
4476        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4477        cancel_work_sync(&rbd_dev->unlock_work);
4478}
4479
4480/*
4481 * header_rwsem must not be held to avoid a deadlock with
4482 * rbd_dev_refresh() when flushing notifies.
4483 */
4484static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4485{
4486        cancel_tasks_sync(rbd_dev);
4487
4488        mutex_lock(&rbd_dev->watch_mutex);
4489        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4490                __rbd_unregister_watch(rbd_dev);
4491        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4492        mutex_unlock(&rbd_dev->watch_mutex);
4493
4494        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4495        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4496}
4497
4498/*
4499 * lock_rwsem must be held for write
4500 */
4501static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4502{
4503        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4504        char cookie[32];
4505        int ret;
4506
4507        if (!rbd_quiesce_lock(rbd_dev))
4508                return;
4509
4510        format_lock_cookie(rbd_dev, cookie);
4511        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4512                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4513                                  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4514                                  RBD_LOCK_TAG, cookie);
4515        if (ret) {
4516                if (ret != -EOPNOTSUPP)
4517                        rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4518                                 ret);
4519
4520                /*
4521                 * Lock cookie cannot be updated on older OSDs, so do
4522                 * a manual release and queue an acquire.
4523                 */
4524                __rbd_release_lock(rbd_dev);
4525                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4526        } else {
4527                __rbd_lock(rbd_dev, cookie);
4528                wake_lock_waiters(rbd_dev, 0);
4529        }
4530}
4531
4532static void rbd_reregister_watch(struct work_struct *work)
4533{
4534        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4535                                            struct rbd_device, watch_dwork);
4536        int ret;
4537
4538        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4539
4540        mutex_lock(&rbd_dev->watch_mutex);
4541        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4542                mutex_unlock(&rbd_dev->watch_mutex);
4543                return;
4544        }
4545
4546        ret = __rbd_register_watch(rbd_dev);
4547        if (ret) {
4548                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4549                if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4550                        queue_delayed_work(rbd_dev->task_wq,
4551                                           &rbd_dev->watch_dwork,
4552                                           RBD_RETRY_DELAY);
4553                        mutex_unlock(&rbd_dev->watch_mutex);
4554                        return;
4555                }
4556
4557                mutex_unlock(&rbd_dev->watch_mutex);
4558                down_write(&rbd_dev->lock_rwsem);
4559                wake_lock_waiters(rbd_dev, ret);
4560                up_write(&rbd_dev->lock_rwsem);
4561                return;
4562        }
4563
4564        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4565        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4566        mutex_unlock(&rbd_dev->watch_mutex);
4567
4568        down_write(&rbd_dev->lock_rwsem);
4569        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4570                rbd_reacquire_lock(rbd_dev);
4571        up_write(&rbd_dev->lock_rwsem);
4572
4573        ret = rbd_dev_refresh(rbd_dev);
4574        if (ret)
4575                rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4576}
4577
4578/*
4579 * Synchronous osd object method call.  Returns the number of bytes
4580 * returned in the outbound buffer, or a negative error code.
4581 */
4582static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4583                             struct ceph_object_id *oid,
4584                             struct ceph_object_locator *oloc,
4585                             const char *method_name,
4586                             const void *outbound,
4587                             size_t outbound_size,
4588                             void *inbound,
4589                             size_t inbound_size)
4590{
4591        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4592        struct page *req_page = NULL;
4593        struct page *reply_page;
4594        int ret;
4595
4596        /*
4597         * Method calls are ultimately read operations.  The result
4598         * should placed into the inbound buffer provided.  They
4599         * also supply outbound data--parameters for the object
4600         * method.  Currently if this is present it will be a
4601         * snapshot id.
4602         */
4603        if (outbound) {
4604                if (outbound_size > PAGE_SIZE)
4605                        return -E2BIG;
4606
4607                req_page = alloc_page(GFP_KERNEL);
4608                if (!req_page)
4609                        return -ENOMEM;
4610
4611                memcpy(page_address(req_page), outbound, outbound_size);
4612        }
4613
4614        reply_page = alloc_page(GFP_KERNEL);
4615        if (!reply_page) {
4616                if (req_page)
4617                        __free_page(req_page);
4618                return -ENOMEM;
4619        }
4620
4621        ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4622                             CEPH_OSD_FLAG_READ, req_page, outbound_size,
4623                             &reply_page, &inbound_size);
4624        if (!ret) {
4625                memcpy(inbound, page_address(reply_page), inbound_size);
4626                ret = inbound_size;
4627        }
4628
4629        if (req_page)
4630                __free_page(req_page);
4631        __free_page(reply_page);
4632        return ret;
4633}
4634
4635static void rbd_queue_workfn(struct work_struct *work)
4636{
4637        struct rbd_img_request *img_request =
4638            container_of(work, struct rbd_img_request, work);
4639        struct rbd_device *rbd_dev = img_request->rbd_dev;
4640        enum obj_operation_type op_type = img_request->op_type;
4641        struct request *rq = blk_mq_rq_from_pdu(img_request);
4642        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4643        u64 length = blk_rq_bytes(rq);
4644        u64 mapping_size;
4645        int result;
4646
4647        /* Ignore/skip any zero-length requests */
4648        if (!length) {
4649                dout("%s: zero-length request\n", __func__);
4650                result = 0;
4651                goto err_img_request;
4652        }
4653
4654        blk_mq_start_request(rq);
4655
4656        down_read(&rbd_dev->header_rwsem);
4657        mapping_size = rbd_dev->mapping.size;
4658        rbd_img_capture_header(img_request);
4659        up_read(&rbd_dev->header_rwsem);
4660
4661        if (offset + length > mapping_size) {
4662                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4663                         length, mapping_size);
4664                result = -EIO;
4665                goto err_img_request;
4666        }
4667
4668        dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4669             img_request, obj_op_name(op_type), offset, length);
4670
4671        if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4672                result = rbd_img_fill_nodata(img_request, offset, length);
4673        else
4674                result = rbd_img_fill_from_bio(img_request, offset, length,
4675                                               rq->bio);
4676        if (result)
4677                goto err_img_request;
4678
4679        rbd_img_handle_request(img_request, 0);
4680        return;
4681
4682err_img_request:
4683        rbd_img_request_destroy(img_request);
4684        if (result)
4685                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4686                         obj_op_name(op_type), length, offset, result);
4687        blk_mq_end_request(rq, errno_to_blk_status(result));
4688}
4689
4690static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4691                const struct blk_mq_queue_data *bd)
4692{
4693        struct rbd_device *rbd_dev = hctx->queue->queuedata;
4694        struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4695        enum obj_operation_type op_type;
4696
4697        switch (req_op(bd->rq)) {
4698        case REQ_OP_DISCARD:
4699                op_type = OBJ_OP_DISCARD;
4700                break;
4701        case REQ_OP_WRITE_ZEROES:
4702                op_type = OBJ_OP_ZEROOUT;
4703                break;
4704        case REQ_OP_WRITE:
4705                op_type = OBJ_OP_WRITE;
4706                break;
4707        case REQ_OP_READ:
4708                op_type = OBJ_OP_READ;
4709                break;
4710        default:
4711                rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4712                return BLK_STS_IOERR;
4713        }
4714
4715        rbd_img_request_init(img_req, rbd_dev, op_type);
4716
4717        if (rbd_img_is_write(img_req)) {
4718                if (rbd_is_ro(rbd_dev)) {
4719                        rbd_warn(rbd_dev, "%s on read-only mapping",
4720                                 obj_op_name(img_req->op_type));
4721                        return BLK_STS_IOERR;
4722                }
4723                rbd_assert(!rbd_is_snap(rbd_dev));
4724        }
4725
4726        INIT_WORK(&img_req->work, rbd_queue_workfn);
4727        queue_work(rbd_wq, &img_req->work);
4728        return BLK_STS_OK;
4729}
4730
4731static void rbd_free_disk(struct rbd_device *rbd_dev)
4732{
4733        blk_cleanup_disk(rbd_dev->disk);
4734        blk_mq_free_tag_set(&rbd_dev->tag_set);
4735        rbd_dev->disk = NULL;
4736}
4737
4738static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4739                             struct ceph_object_id *oid,
4740                             struct ceph_object_locator *oloc,
4741                             void *buf, int buf_len)
4742
4743{
4744        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4745        struct ceph_osd_request *req;
4746        struct page **pages;
4747        int num_pages = calc_pages_for(0, buf_len);
4748        int ret;
4749
4750        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4751        if (!req)
4752                return -ENOMEM;
4753
4754        ceph_oid_copy(&req->r_base_oid, oid);
4755        ceph_oloc_copy(&req->r_base_oloc, oloc);
4756        req->r_flags = CEPH_OSD_FLAG_READ;
4757
4758        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4759        if (IS_ERR(pages)) {
4760                ret = PTR_ERR(pages);
4761                goto out_req;
4762        }
4763
4764        osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4765        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4766                                         true);
4767
4768        ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4769        if (ret)
4770                goto out_req;
4771
4772        ceph_osdc_start_request(osdc, req, false);
4773        ret = ceph_osdc_wait_request(osdc, req);
4774        if (ret >= 0)
4775                ceph_copy_from_page_vector(pages, buf, 0, ret);
4776
4777out_req:
4778        ceph_osdc_put_request(req);
4779        return ret;
4780}
4781
4782/*
4783 * Read the complete header for the given rbd device.  On successful
4784 * return, the rbd_dev->header field will contain up-to-date
4785 * information about the image.
4786 */
4787static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4788{
4789        struct rbd_image_header_ondisk *ondisk = NULL;
4790        u32 snap_count = 0;
4791        u64 names_size = 0;
4792        u32 want_count;
4793        int ret;
4794
4795        /*
4796         * The complete header will include an array of its 64-bit
4797         * snapshot ids, followed by the names of those snapshots as
4798         * a contiguous block of NUL-terminated strings.  Note that
4799         * the number of snapshots could change by the time we read
4800         * it in, in which case we re-read it.
4801         */
4802        do {
4803                size_t size;
4804
4805                kfree(ondisk);
4806
4807                size = sizeof (*ondisk);
4808                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4809                size += names_size;
4810                ondisk = kmalloc(size, GFP_KERNEL);
4811                if (!ondisk)
4812                        return -ENOMEM;
4813
4814                ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4815                                        &rbd_dev->header_oloc, ondisk, size);
4816                if (ret < 0)
4817                        goto out;
4818                if ((size_t)ret < size) {
4819                        ret = -ENXIO;
4820                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4821                                size, ret);
4822                        goto out;
4823                }
4824                if (!rbd_dev_ondisk_valid(ondisk)) {
4825                        ret = -ENXIO;
4826                        rbd_warn(rbd_dev, "invalid header");
4827                        goto out;
4828                }
4829
4830                names_size = le64_to_cpu(ondisk->snap_names_len);
4831                want_count = snap_count;
4832                snap_count = le32_to_cpu(ondisk->snap_count);
4833        } while (snap_count != want_count);
4834
4835        ret = rbd_header_from_disk(rbd_dev, ondisk);
4836out:
4837        kfree(ondisk);
4838
4839        return ret;
4840}
4841
4842static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4843{
4844        sector_t size;
4845
4846        /*
4847         * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4848         * try to update its size.  If REMOVING is set, updating size
4849         * is just useless work since the device can't be opened.
4850         */
4851        if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4852            !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4853                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4854                dout("setting size to %llu sectors", (unsigned long long)size);
4855                set_capacity_and_notify(rbd_dev->disk, size);
4856        }
4857}
4858
4859static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4860{
4861        u64 mapping_size;
4862        int ret;
4863
4864        down_write(&rbd_dev->header_rwsem);
4865        mapping_size = rbd_dev->mapping.size;
4866
4867        ret = rbd_dev_header_info(rbd_dev);
4868        if (ret)
4869                goto out;
4870
4871        /*
4872         * If there is a parent, see if it has disappeared due to the
4873         * mapped image getting flattened.
4874         */
4875        if (rbd_dev->parent) {
4876                ret = rbd_dev_v2_parent_info(rbd_dev);
4877                if (ret)
4878                        goto out;
4879        }
4880
4881        rbd_assert(!rbd_is_snap(rbd_dev));
4882        rbd_dev->mapping.size = rbd_dev->header.image_size;
4883
4884out:
4885        up_write(&rbd_dev->header_rwsem);
4886        if (!ret && mapping_size != rbd_dev->mapping.size)
4887                rbd_dev_update_size(rbd_dev);
4888
4889        return ret;
4890}
4891
4892static const struct blk_mq_ops rbd_mq_ops = {
4893        .queue_rq       = rbd_queue_rq,
4894};
4895
4896static int rbd_init_disk(struct rbd_device *rbd_dev)
4897{
4898        struct gendisk *disk;
4899        struct request_queue *q;
4900        unsigned int objset_bytes =
4901            rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4902        int err;
4903
4904        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4905        rbd_dev->tag_set.ops = &rbd_mq_ops;
4906        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4907        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4908        rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4909        rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4910        rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4911
4912        err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4913        if (err)
4914                return err;
4915
4916        disk = blk_mq_alloc_disk(&rbd_dev->tag_set, rbd_dev);
4917        if (IS_ERR(disk)) {
4918                err = PTR_ERR(disk);
4919                goto out_tag_set;
4920        }
4921        q = disk->queue;
4922
4923        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4924                 rbd_dev->dev_id);
4925        disk->major = rbd_dev->major;
4926        disk->first_minor = rbd_dev->minor;
4927        if (single_major) {
4928                disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
4929                disk->flags |= GENHD_FL_EXT_DEVT;
4930        } else {
4931                disk->minors = RBD_MINORS_PER_MAJOR;
4932        }
4933        disk->fops = &rbd_bd_ops;
4934        disk->private_data = rbd_dev;
4935
4936        blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4937        /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4938
4939        blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4940        q->limits.max_sectors = queue_max_hw_sectors(q);
4941        blk_queue_max_segments(q, USHRT_MAX);
4942        blk_queue_max_segment_size(q, UINT_MAX);
4943        blk_queue_io_min(q, rbd_dev->opts->alloc_size);
4944        blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
4945
4946        if (rbd_dev->opts->trim) {
4947                blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4948                q->limits.discard_granularity = rbd_dev->opts->alloc_size;
4949                blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4950                blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4951        }
4952
4953        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4954                blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
4955
4956        rbd_dev->disk = disk;
4957
4958        return 0;
4959out_tag_set:
4960        blk_mq_free_tag_set(&rbd_dev->tag_set);
4961        return err;
4962}
4963
4964/*
4965  sysfs
4966*/
4967
4968static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4969{
4970        return container_of(dev, struct rbd_device, dev);
4971}
4972
4973static ssize_t rbd_size_show(struct device *dev,
4974                             struct device_attribute *attr, char *buf)
4975{
4976        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4977
4978        return sprintf(buf, "%llu\n",
4979                (unsigned long long)rbd_dev->mapping.size);
4980}
4981
4982static ssize_t rbd_features_show(struct device *dev,
4983                             struct device_attribute *attr, char *buf)
4984{
4985        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4986
4987        return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
4988}
4989
4990static ssize_t rbd_major_show(struct device *dev,
4991                              struct device_attribute *attr, char *buf)
4992{
4993        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4994
4995        if (rbd_dev->major)
4996                return sprintf(buf, "%d\n", rbd_dev->major);
4997
4998        return sprintf(buf, "(none)\n");
4999}
5000
5001static ssize_t rbd_minor_show(struct device *dev,
5002                              struct device_attribute *attr, char *buf)
5003{
5004        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5005
5006        return sprintf(buf, "%d\n", rbd_dev->minor);
5007}
5008
5009static ssize_t rbd_client_addr_show(struct device *dev,
5010                                    struct device_attribute *attr, char *buf)
5011{
5012        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5013        struct ceph_entity_addr *client_addr =
5014            ceph_client_addr(rbd_dev->rbd_client->client);
5015
5016        return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5017                       le32_to_cpu(client_addr->nonce));
5018}
5019
5020static ssize_t rbd_client_id_show(struct device *dev,
5021                                  struct device_attribute *attr, char *buf)
5022{
5023        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5024
5025        return sprintf(buf, "client%lld\n",
5026                       ceph_client_gid(rbd_dev->rbd_client->client));
5027}
5028
5029static ssize_t rbd_cluster_fsid_show(struct device *dev,
5030                                     struct device_attribute *attr, char *buf)
5031{
5032        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5033
5034        return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5035}
5036
5037static ssize_t rbd_config_info_show(struct device *dev,
5038                                    struct device_attribute *attr, char *buf)
5039{
5040        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5041
5042        if (!capable(CAP_SYS_ADMIN))
5043                return -EPERM;
5044
5045        return sprintf(buf, "%s\n", rbd_dev->config_info);
5046}
5047
5048static ssize_t rbd_pool_show(struct device *dev,
5049                             struct device_attribute *attr, char *buf)
5050{
5051        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5052
5053        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5054}
5055
5056static ssize_t rbd_pool_id_show(struct device *dev,
5057                             struct device_attribute *attr, char *buf)
5058{
5059        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5060
5061        return sprintf(buf, "%llu\n",
5062                        (unsigned long long) rbd_dev->spec->pool_id);
5063}
5064
5065static ssize_t rbd_pool_ns_show(struct device *dev,
5066                                struct device_attribute *attr, char *buf)
5067{
5068        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5069
5070        return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5071}
5072
5073static ssize_t rbd_name_show(struct device *dev,
5074                             struct device_attribute *attr, char *buf)
5075{
5076        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5077
5078        if (rbd_dev->spec->image_name)
5079                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5080
5081        return sprintf(buf, "(unknown)\n");
5082}
5083
5084static ssize_t rbd_image_id_show(struct device *dev,
5085                             struct device_attribute *attr, char *buf)
5086{
5087        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5088
5089        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5090}
5091
5092/*
5093 * Shows the name of the currently-mapped snapshot (or
5094 * RBD_SNAP_HEAD_NAME for the base image).
5095 */
5096static ssize_t rbd_snap_show(struct device *dev,
5097                             struct device_attribute *attr,
5098                             char *buf)
5099{
5100        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5101
5102        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5103}
5104
5105static ssize_t rbd_snap_id_show(struct device *dev,
5106                                struct device_attribute *attr, char *buf)
5107{
5108        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5109
5110        return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5111}
5112
5113/*
5114 * For a v2 image, shows the chain of parent images, separated by empty
5115 * lines.  For v1 images or if there is no parent, shows "(no parent
5116 * image)".
5117 */
5118static ssize_t rbd_parent_show(struct device *dev,
5119                               struct device_attribute *attr,
5120                               char *buf)
5121{
5122        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5123        ssize_t count = 0;
5124
5125        if (!rbd_dev->parent)
5126                return sprintf(buf, "(no parent image)\n");
5127
5128        for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5129                struct rbd_spec *spec = rbd_dev->parent_spec;
5130
5131                count += sprintf(&buf[count], "%s"
5132                            "pool_id %llu\npool_name %s\n"
5133                            "pool_ns %s\n"
5134                            "image_id %s\nimage_name %s\n"
5135                            "snap_id %llu\nsnap_name %s\n"
5136                            "overlap %llu\n",
5137                            !count ? "" : "\n", /* first? */
5138                            spec->pool_id, spec->pool_name,
5139                            spec->pool_ns ?: "",
5140                            spec->image_id, spec->image_name ?: "(unknown)",
5141                            spec->snap_id, spec->snap_name,
5142                            rbd_dev->parent_overlap);
5143        }
5144
5145        return count;
5146}
5147
5148static ssize_t rbd_image_refresh(struct device *dev,
5149                                 struct device_attribute *attr,
5150                                 const char *buf,
5151                                 size_t size)
5152{
5153        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5154        int ret;
5155
5156        if (!capable(CAP_SYS_ADMIN))
5157                return -EPERM;
5158
5159        ret = rbd_dev_refresh(rbd_dev);
5160        if (ret)
5161                return ret;
5162
5163        return size;
5164}
5165
5166static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5167static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5168static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5169static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5170static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5171static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5172static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5173static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5174static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5175static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5176static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5177static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5178static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5179static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5180static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5181static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5182static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5183
5184static struct attribute *rbd_attrs[] = {
5185        &dev_attr_size.attr,
5186        &dev_attr_features.attr,
5187        &dev_attr_major.attr,
5188        &dev_attr_minor.attr,
5189        &dev_attr_client_addr.attr,
5190        &dev_attr_client_id.attr,
5191        &dev_attr_cluster_fsid.attr,
5192        &dev_attr_config_info.attr,
5193        &dev_attr_pool.attr,
5194        &dev_attr_pool_id.attr,
5195        &dev_attr_pool_ns.attr,
5196        &dev_attr_name.attr,
5197        &dev_attr_image_id.attr,
5198        &dev_attr_current_snap.attr,
5199        &dev_attr_snap_id.attr,
5200        &dev_attr_parent.attr,
5201        &dev_attr_refresh.attr,
5202        NULL
5203};
5204
5205static struct attribute_group rbd_attr_group = {
5206        .attrs = rbd_attrs,
5207};
5208
5209static const struct attribute_group *rbd_attr_groups[] = {
5210        &rbd_attr_group,
5211        NULL
5212};
5213
5214static void rbd_dev_release(struct device *dev);
5215
5216static const struct device_type rbd_device_type = {
5217        .name           = "rbd",
5218        .groups         = rbd_attr_groups,
5219        .release        = rbd_dev_release,
5220};
5221
5222static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5223{
5224        kref_get(&spec->kref);
5225
5226        return spec;
5227}
5228
5229static void rbd_spec_free(struct kref *kref);
5230static void rbd_spec_put(struct rbd_spec *spec)
5231{
5232        if (spec)
5233                kref_put(&spec->kref, rbd_spec_free);
5234}
5235
5236static struct rbd_spec *rbd_spec_alloc(void)
5237{
5238        struct rbd_spec *spec;
5239
5240        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5241        if (!spec)
5242                return NULL;
5243
5244        spec->pool_id = CEPH_NOPOOL;
5245        spec->snap_id = CEPH_NOSNAP;
5246        kref_init(&spec->kref);
5247
5248        return spec;
5249}
5250
5251static void rbd_spec_free(struct kref *kref)
5252{
5253        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5254
5255        kfree(spec->pool_name);
5256        kfree(spec->pool_ns);
5257        kfree(spec->image_id);
5258        kfree(spec->image_name);
5259        kfree(spec->snap_name);
5260        kfree(spec);
5261}
5262
5263static void rbd_dev_free(struct rbd_device *rbd_dev)
5264{
5265        WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5266        WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5267
5268        ceph_oid_destroy(&rbd_dev->header_oid);
5269        ceph_oloc_destroy(&rbd_dev->header_oloc);
5270        kfree(rbd_dev->config_info);
5271
5272        rbd_put_client(rbd_dev->rbd_client);
5273        rbd_spec_put(rbd_dev->spec);
5274        kfree(rbd_dev->opts);
5275        kfree(rbd_dev);
5276}
5277
5278static void rbd_dev_release(struct device *dev)
5279{
5280        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5281        bool need_put = !!rbd_dev->opts;
5282
5283        if (need_put) {
5284                destroy_workqueue(rbd_dev->task_wq);
5285                ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5286        }
5287
5288        rbd_dev_free(rbd_dev);
5289
5290        /*
5291         * This is racy, but way better than putting module outside of
5292         * the release callback.  The race window is pretty small, so
5293         * doing something similar to dm (dm-builtin.c) is overkill.
5294         */
5295        if (need_put)
5296                module_put(THIS_MODULE);
5297}
5298
5299static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5300                                           struct rbd_spec *spec)
5301{
5302        struct rbd_device *rbd_dev;
5303
5304        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5305        if (!rbd_dev)
5306                return NULL;
5307
5308        spin_lock_init(&rbd_dev->lock);
5309        INIT_LIST_HEAD(&rbd_dev->node);
5310        init_rwsem(&rbd_dev->header_rwsem);
5311
5312        rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5313        ceph_oid_init(&rbd_dev->header_oid);
5314        rbd_dev->header_oloc.pool = spec->pool_id;
5315        if (spec->pool_ns) {
5316                WARN_ON(!*spec->pool_ns);
5317                rbd_dev->header_oloc.pool_ns =
5318                    ceph_find_or_create_string(spec->pool_ns,
5319                                               strlen(spec->pool_ns));
5320        }
5321
5322        mutex_init(&rbd_dev->watch_mutex);
5323        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5324        INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5325
5326        init_rwsem(&rbd_dev->lock_rwsem);
5327        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5328        INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5329        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5330        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5331        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5332        spin_lock_init(&rbd_dev->lock_lists_lock);
5333        INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5334        INIT_LIST_HEAD(&rbd_dev->running_list);
5335        init_completion(&rbd_dev->acquire_wait);
5336        init_completion(&rbd_dev->releasing_wait);
5337
5338        spin_lock_init(&rbd_dev->object_map_lock);
5339
5340        rbd_dev->dev.bus = &rbd_bus_type;
5341        rbd_dev->dev.type = &rbd_device_type;
5342        rbd_dev->dev.parent = &rbd_root_dev;
5343        device_initialize(&rbd_dev->dev);
5344
5345        rbd_dev->rbd_client = rbdc;
5346        rbd_dev->spec = spec;
5347
5348        return rbd_dev;
5349}
5350
5351/*
5352 * Create a mapping rbd_dev.
5353 */
5354static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5355                                         struct rbd_spec *spec,
5356                                         struct rbd_options *opts)
5357{
5358        struct rbd_device *rbd_dev;
5359
5360        rbd_dev = __rbd_dev_create(rbdc, spec);
5361        if (!rbd_dev)
5362                return NULL;
5363
5364        rbd_dev->opts = opts;
5365
5366        /* get an id and fill in device name */
5367        rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5368                                         minor_to_rbd_dev_id(1 << MINORBITS),
5369                                         GFP_KERNEL);
5370        if (rbd_dev->dev_id < 0)
5371                goto fail_rbd_dev;
5372
5373        sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5374        rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5375                                                   rbd_dev->name);
5376        if (!rbd_dev->task_wq)
5377                goto fail_dev_id;
5378
5379        /* we have a ref from do_rbd_add() */
5380        __module_get(THIS_MODULE);
5381
5382        dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5383        return rbd_dev;
5384
5385fail_dev_id:
5386        ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5387fail_rbd_dev:
5388        rbd_dev_free(rbd_dev);
5389        return NULL;
5390}
5391
5392static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5393{
5394        if (rbd_dev)
5395                put_device(&rbd_dev->dev);
5396}
5397
5398/*
5399 * Get the size and object order for an image snapshot, or if
5400 * snap_id is CEPH_NOSNAP, gets this information for the base
5401 * image.
5402 */
5403static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5404                                u8 *order, u64 *snap_size)
5405{
5406        __le64 snapid = cpu_to_le64(snap_id);
5407        int ret;
5408        struct {
5409                u8 order;
5410                __le64 size;
5411        } __attribute__ ((packed)) size_buf = { 0 };
5412
5413        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5414                                  &rbd_dev->header_oloc, "get_size",
5415                                  &snapid, sizeof(snapid),
5416                                  &size_buf, sizeof(size_buf));
5417        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5418        if (ret < 0)
5419                return ret;
5420        if (ret < sizeof (size_buf))
5421                return -ERANGE;
5422
5423        if (order) {
5424                *order = size_buf.order;
5425                dout("  order %u", (unsigned int)*order);
5426        }
5427        *snap_size = le64_to_cpu(size_buf.size);
5428
5429        dout("  snap_id 0x%016llx snap_size = %llu\n",
5430                (unsigned long long)snap_id,
5431                (unsigned long long)*snap_size);
5432
5433        return 0;
5434}
5435
5436static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5437{
5438        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5439                                        &rbd_dev->header.obj_order,
5440                                        &rbd_dev->header.image_size);
5441}
5442
5443static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5444{
5445        size_t size;
5446        void *reply_buf;
5447        int ret;
5448        void *p;
5449
5450        /* Response will be an encoded string, which includes a length */
5451        size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5452        reply_buf = kzalloc(size, GFP_KERNEL);
5453        if (!reply_buf)
5454                return -ENOMEM;
5455
5456        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5457                                  &rbd_dev->header_oloc, "get_object_prefix",
5458                                  NULL, 0, reply_buf, size);
5459        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5460        if (ret < 0)
5461                goto out;
5462
5463        p = reply_buf;
5464        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5465                                                p + ret, NULL, GFP_NOIO);
5466        ret = 0;
5467
5468        if (IS_ERR(rbd_dev->header.object_prefix)) {
5469                ret = PTR_ERR(rbd_dev->header.object_prefix);
5470                rbd_dev->header.object_prefix = NULL;
5471        } else {
5472                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5473        }
5474out:
5475        kfree(reply_buf);
5476
5477        return ret;
5478}
5479
5480static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5481                                     bool read_only, u64 *snap_features)
5482{
5483        struct {
5484                __le64 snap_id;
5485                u8 read_only;
5486        } features_in;
5487        struct {
5488                __le64 features;
5489                __le64 incompat;
5490        } __attribute__ ((packed)) features_buf = { 0 };
5491        u64 unsup;
5492        int ret;
5493
5494        features_in.snap_id = cpu_to_le64(snap_id);
5495        features_in.read_only = read_only;
5496
5497        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5498                                  &rbd_dev->header_oloc, "get_features",
5499                                  &features_in, sizeof(features_in),
5500                                  &features_buf, sizeof(features_buf));
5501        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5502        if (ret < 0)
5503                return ret;
5504        if (ret < sizeof (features_buf))
5505                return -ERANGE;
5506
5507        unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5508        if (unsup) {
5509                rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5510                         unsup);
5511                return -ENXIO;
5512        }
5513
5514        *snap_features = le64_to_cpu(features_buf.features);
5515
5516        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5517                (unsigned long long)snap_id,
5518                (unsigned long long)*snap_features,
5519                (unsigned long long)le64_to_cpu(features_buf.incompat));
5520
5521        return 0;
5522}
5523
5524static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5525{
5526        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5527                                         rbd_is_ro(rbd_dev),
5528                                         &rbd_dev->header.features);
5529}
5530
5531/*
5532 * These are generic image flags, but since they are used only for
5533 * object map, store them in rbd_dev->object_map_flags.
5534 *
5535 * For the same reason, this function is called only on object map
5536 * (re)load and not on header refresh.
5537 */
5538static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5539{
5540        __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5541        __le64 flags;
5542        int ret;
5543
5544        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5545                                  &rbd_dev->header_oloc, "get_flags",
5546                                  &snapid, sizeof(snapid),
5547                                  &flags, sizeof(flags));
5548        if (ret < 0)
5549                return ret;
5550        if (ret < sizeof(flags))
5551                return -EBADMSG;
5552
5553        rbd_dev->object_map_flags = le64_to_cpu(flags);
5554        return 0;
5555}
5556
5557struct parent_image_info {
5558        u64             pool_id;
5559        const char      *pool_ns;
5560        const char      *image_id;
5561        u64             snap_id;
5562
5563        bool            has_overlap;
5564        u64             overlap;
5565};
5566
5567/*
5568 * The caller is responsible for @pii.
5569 */
5570static int decode_parent_image_spec(void **p, void *end,
5571                                    struct parent_image_info *pii)
5572{
5573        u8 struct_v;
5574        u32 struct_len;
5575        int ret;
5576
5577        ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5578                                  &struct_v, &struct_len);
5579        if (ret)
5580                return ret;
5581
5582        ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5583        pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5584        if (IS_ERR(pii->pool_ns)) {
5585                ret = PTR_ERR(pii->pool_ns);
5586                pii->pool_ns = NULL;
5587                return ret;
5588        }
5589        pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5590        if (IS_ERR(pii->image_id)) {
5591                ret = PTR_ERR(pii->image_id);
5592                pii->image_id = NULL;
5593                return ret;
5594        }
5595        ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5596        return 0;
5597
5598e_inval:
5599        return -EINVAL;
5600}
5601
5602static int __get_parent_info(struct rbd_device *rbd_dev,
5603                             struct page *req_page,
5604                             struct page *reply_page,
5605                             struct parent_image_info *pii)
5606{
5607        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5608        size_t reply_len = PAGE_SIZE;
5609        void *p, *end;
5610        int ret;
5611
5612        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5613                             "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5614                             req_page, sizeof(u64), &reply_page, &reply_len);
5615        if (ret)
5616                return ret == -EOPNOTSUPP ? 1 : ret;
5617
5618        p = page_address(reply_page);
5619        end = p + reply_len;
5620        ret = decode_parent_image_spec(&p, end, pii);
5621        if (ret)
5622                return ret;
5623
5624        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5625                             "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5626                             req_page, sizeof(u64), &reply_page, &reply_len);
5627        if (ret)
5628                return ret;
5629
5630        p = page_address(reply_page);
5631        end = p + reply_len;
5632        ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5633        if (pii->has_overlap)
5634                ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5635
5636        return 0;
5637
5638e_inval:
5639        return -EINVAL;
5640}
5641
5642/*
5643 * The caller is responsible for @pii.
5644 */
5645static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5646                                    struct page *req_page,
5647                                    struct page *reply_page,
5648                                    struct parent_image_info *pii)
5649{
5650        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5651        size_t reply_len = PAGE_SIZE;
5652        void *p, *end;
5653        int ret;
5654
5655        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5656                             "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5657                             req_page, sizeof(u64), &reply_page, &reply_len);
5658        if (ret)
5659                return ret;
5660
5661        p = page_address(reply_page);
5662        end = p + reply_len;
5663        ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5664        pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5665        if (IS_ERR(pii->image_id)) {
5666                ret = PTR_ERR(pii->image_id);
5667                pii->image_id = NULL;
5668                return ret;
5669        }
5670        ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5671        pii->has_overlap = true;
5672        ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5673
5674        return 0;
5675
5676e_inval:
5677        return -EINVAL;
5678}
5679
5680static int get_parent_info(struct rbd_device *rbd_dev,
5681                           struct parent_image_info *pii)
5682{
5683        struct page *req_page, *reply_page;
5684        void *p;
5685        int ret;
5686
5687        req_page = alloc_page(GFP_KERNEL);
5688        if (!req_page)
5689                return -ENOMEM;
5690
5691        reply_page = alloc_page(GFP_KERNEL);
5692        if (!reply_page) {
5693                __free_page(req_page);
5694                return -ENOMEM;
5695        }
5696
5697        p = page_address(req_page);
5698        ceph_encode_64(&p, rbd_dev->spec->snap_id);
5699        ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5700        if (ret > 0)
5701                ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5702                                               pii);
5703
5704        __free_page(req_page);
5705        __free_page(reply_page);
5706        return ret;
5707}
5708
5709static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5710{
5711        struct rbd_spec *parent_spec;
5712        struct parent_image_info pii = { 0 };
5713        int ret;
5714
5715        parent_spec = rbd_spec_alloc();
5716        if (!parent_spec)
5717                return -ENOMEM;
5718
5719        ret = get_parent_info(rbd_dev, &pii);
5720        if (ret)
5721                goto out_err;
5722
5723        dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5724             __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5725             pii.has_overlap, pii.overlap);
5726
5727        if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5728                /*
5729                 * Either the parent never existed, or we have
5730                 * record of it but the image got flattened so it no
5731                 * longer has a parent.  When the parent of a
5732                 * layered image disappears we immediately set the
5733                 * overlap to 0.  The effect of this is that all new
5734                 * requests will be treated as if the image had no
5735                 * parent.
5736                 *
5737                 * If !pii.has_overlap, the parent image spec is not
5738                 * applicable.  It's there to avoid duplication in each
5739                 * snapshot record.
5740                 */
5741                if (rbd_dev->parent_overlap) {
5742                        rbd_dev->parent_overlap = 0;
5743                        rbd_dev_parent_put(rbd_dev);
5744                        pr_info("%s: clone image has been flattened\n",
5745                                rbd_dev->disk->disk_name);
5746                }
5747
5748                goto out;       /* No parent?  No problem. */
5749        }
5750
5751        /* The ceph file layout needs to fit pool id in 32 bits */
5752
5753        ret = -EIO;
5754        if (pii.pool_id > (u64)U32_MAX) {
5755                rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5756                        (unsigned long long)pii.pool_id, U32_MAX);
5757                goto out_err;
5758        }
5759
5760        /*
5761         * The parent won't change (except when the clone is
5762         * flattened, already handled that).  So we only need to
5763         * record the parent spec we have not already done so.
5764         */
5765        if (!rbd_dev->parent_spec) {
5766                parent_spec->pool_id = pii.pool_id;
5767                if (pii.pool_ns && *pii.pool_ns) {
5768                        parent_spec->pool_ns = pii.pool_ns;
5769                        pii.pool_ns = NULL;
5770                }
5771                parent_spec->image_id = pii.image_id;
5772                pii.image_id = NULL;
5773                parent_spec->snap_id = pii.snap_id;
5774
5775                rbd_dev->parent_spec = parent_spec;
5776                parent_spec = NULL;     /* rbd_dev now owns this */
5777        }
5778
5779        /*
5780         * We always update the parent overlap.  If it's zero we issue
5781         * a warning, as we will proceed as if there was no parent.
5782         */
5783        if (!pii.overlap) {
5784                if (parent_spec) {
5785                        /* refresh, careful to warn just once */
5786                        if (rbd_dev->parent_overlap)
5787                                rbd_warn(rbd_dev,
5788                                    "clone now standalone (overlap became 0)");
5789                } else {
5790                        /* initial probe */
5791                        rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5792                }
5793        }
5794        rbd_dev->parent_overlap = pii.overlap;
5795
5796out:
5797        ret = 0;
5798out_err:
5799        kfree(pii.pool_ns);
5800        kfree(pii.image_id);
5801        rbd_spec_put(parent_spec);
5802        return ret;
5803}
5804
5805static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5806{
5807        struct {
5808                __le64 stripe_unit;
5809                __le64 stripe_count;
5810        } __attribute__ ((packed)) striping_info_buf = { 0 };
5811        size_t size = sizeof (striping_info_buf);
5812        void *p;
5813        int ret;
5814
5815        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5816                                &rbd_dev->header_oloc, "get_stripe_unit_count",
5817                                NULL, 0, &striping_info_buf, size);
5818        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5819        if (ret < 0)
5820                return ret;
5821        if (ret < size)
5822                return -ERANGE;
5823
5824        p = &striping_info_buf;
5825        rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5826        rbd_dev->header.stripe_count = ceph_decode_64(&p);
5827        return 0;
5828}
5829
5830static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5831{
5832        __le64 data_pool_id;
5833        int ret;
5834
5835        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5836                                  &rbd_dev->header_oloc, "get_data_pool",
5837                                  NULL, 0, &data_pool_id, sizeof(data_pool_id));
5838        if (ret < 0)
5839                return ret;
5840        if (ret < sizeof(data_pool_id))
5841                return -EBADMSG;
5842
5843        rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5844        WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5845        return 0;
5846}
5847
5848static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5849{
5850        CEPH_DEFINE_OID_ONSTACK(oid);
5851        size_t image_id_size;
5852        char *image_id;
5853        void *p;
5854        void *end;
5855        size_t size;
5856        void *reply_buf = NULL;
5857        size_t len = 0;
5858        char *image_name = NULL;
5859        int ret;
5860
5861        rbd_assert(!rbd_dev->spec->image_name);
5862
5863        len = strlen(rbd_dev->spec->image_id);
5864        image_id_size = sizeof (__le32) + len;
5865        image_id = kmalloc(image_id_size, GFP_KERNEL);
5866        if (!image_id)
5867                return NULL;
5868
5869        p = image_id;
5870        end = image_id + image_id_size;
5871        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5872
5873        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5874        reply_buf = kmalloc(size, GFP_KERNEL);
5875        if (!reply_buf)
5876                goto out;
5877
5878        ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5879        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5880                                  "dir_get_name", image_id, image_id_size,
5881                                  reply_buf, size);
5882        if (ret < 0)
5883                goto out;
5884        p = reply_buf;
5885        end = reply_buf + ret;
5886
5887        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5888        if (IS_ERR(image_name))
5889                image_name = NULL;
5890        else
5891                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5892out:
5893        kfree(reply_buf);
5894        kfree(image_id);
5895
5896        return image_name;
5897}
5898
5899static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5900{
5901        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5902        const char *snap_name;
5903        u32 which = 0;
5904
5905        /* Skip over names until we find the one we are looking for */
5906
5907        snap_name = rbd_dev->header.snap_names;
5908        while (which < snapc->num_snaps) {
5909                if (!strcmp(name, snap_name))
5910                        return snapc->snaps[which];
5911                snap_name += strlen(snap_name) + 1;
5912                which++;
5913        }
5914        return CEPH_NOSNAP;
5915}
5916
5917static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5918{
5919        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5920        u32 which;
5921        bool found = false;
5922        u64 snap_id;
5923
5924        for (which = 0; !found && which < snapc->num_snaps; which++) {
5925                const char *snap_name;
5926
5927                snap_id = snapc->snaps[which];
5928                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5929                if (IS_ERR(snap_name)) {
5930                        /* ignore no-longer existing snapshots */
5931                        if (PTR_ERR(snap_name) == -ENOENT)
5932                                continue;
5933                        else
5934                                break;
5935                }
5936                found = !strcmp(name, snap_name);
5937                kfree(snap_name);
5938        }
5939        return found ? snap_id : CEPH_NOSNAP;
5940}
5941
5942/*
5943 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5944 * no snapshot by that name is found, or if an error occurs.
5945 */
5946static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5947{
5948        if (rbd_dev->image_format == 1)
5949                return rbd_v1_snap_id_by_name(rbd_dev, name);
5950
5951        return rbd_v2_snap_id_by_name(rbd_dev, name);
5952}
5953
5954/*
5955 * An image being mapped will have everything but the snap id.
5956 */
5957static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5958{
5959        struct rbd_spec *spec = rbd_dev->spec;
5960
5961        rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5962        rbd_assert(spec->image_id && spec->image_name);
5963        rbd_assert(spec->snap_name);
5964
5965        if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5966                u64 snap_id;
5967
5968                snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5969                if (snap_id == CEPH_NOSNAP)
5970                        return -ENOENT;
5971
5972                spec->snap_id = snap_id;
5973        } else {
5974                spec->snap_id = CEPH_NOSNAP;
5975        }
5976
5977        return 0;
5978}
5979
5980/*
5981 * A parent image will have all ids but none of the names.
5982 *
5983 * All names in an rbd spec are dynamically allocated.  It's OK if we
5984 * can't figure out the name for an image id.
5985 */
5986static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5987{
5988        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5989        struct rbd_spec *spec = rbd_dev->spec;
5990        const char *pool_name;
5991        const char *image_name;
5992        const char *snap_name;
5993        int ret;
5994
5995        rbd_assert(spec->pool_id != CEPH_NOPOOL);
5996        rbd_assert(spec->image_id);
5997        rbd_assert(spec->snap_id != CEPH_NOSNAP);
5998
5999        /* Get the pool name; we have to make our own copy of this */
6000
6001        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6002        if (!pool_name) {
6003                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6004                return -EIO;
6005        }
6006        pool_name = kstrdup(pool_name, GFP_KERNEL);
6007        if (!pool_name)
6008                return -ENOMEM;
6009
6010        /* Fetch the image name; tolerate failure here */
6011
6012        image_name = rbd_dev_image_name(rbd_dev);
6013        if (!image_name)
6014                rbd_warn(rbd_dev, "unable to get image name");
6015
6016        /* Fetch the snapshot name */
6017
6018        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6019        if (IS_ERR(snap_name)) {
6020                ret = PTR_ERR(snap_name);
6021                goto out_err;
6022        }
6023
6024        spec->pool_name = pool_name;
6025        spec->image_name = image_name;
6026        spec->snap_name = snap_name;
6027
6028        return 0;
6029
6030out_err:
6031        kfree(image_name);
6032        kfree(pool_name);
6033        return ret;
6034}
6035
6036static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6037{
6038        size_t size;
6039        int ret;
6040        void *reply_buf;
6041        void *p;
6042        void *end;
6043        u64 seq;
6044        u32 snap_count;
6045        struct ceph_snap_context *snapc;
6046        u32 i;
6047
6048        /*
6049         * We'll need room for the seq value (maximum snapshot id),
6050         * snapshot count, and array of that many snapshot ids.
6051         * For now we have a fixed upper limit on the number we're
6052         * prepared to receive.
6053         */
6054        size = sizeof (__le64) + sizeof (__le32) +
6055                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
6056        reply_buf = kzalloc(size, GFP_KERNEL);
6057        if (!reply_buf)
6058                return -ENOMEM;
6059
6060        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6061                                  &rbd_dev->header_oloc, "get_snapcontext",
6062                                  NULL, 0, reply_buf, size);
6063        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6064        if (ret < 0)
6065                goto out;
6066
6067        p = reply_buf;
6068        end = reply_buf + ret;
6069        ret = -ERANGE;
6070        ceph_decode_64_safe(&p, end, seq, out);
6071        ceph_decode_32_safe(&p, end, snap_count, out);
6072
6073        /*
6074         * Make sure the reported number of snapshot ids wouldn't go
6075         * beyond the end of our buffer.  But before checking that,
6076         * make sure the computed size of the snapshot context we
6077         * allocate is representable in a size_t.
6078         */
6079        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6080                                 / sizeof (u64)) {
6081                ret = -EINVAL;
6082                goto out;
6083        }
6084        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6085                goto out;
6086        ret = 0;
6087
6088        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6089        if (!snapc) {
6090                ret = -ENOMEM;
6091                goto out;
6092        }
6093        snapc->seq = seq;
6094        for (i = 0; i < snap_count; i++)
6095                snapc->snaps[i] = ceph_decode_64(&p);
6096
6097        ceph_put_snap_context(rbd_dev->header.snapc);
6098        rbd_dev->header.snapc = snapc;
6099
6100        dout("  snap context seq = %llu, snap_count = %u\n",
6101                (unsigned long long)seq, (unsigned int)snap_count);
6102out:
6103        kfree(reply_buf);
6104
6105        return ret;
6106}
6107
6108static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6109                                        u64 snap_id)
6110{
6111        size_t size;
6112        void *reply_buf;
6113        __le64 snapid;
6114        int ret;
6115        void *p;
6116        void *end;
6117        char *snap_name;
6118
6119        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6120        reply_buf = kmalloc(size, GFP_KERNEL);
6121        if (!reply_buf)
6122                return ERR_PTR(-ENOMEM);
6123
6124        snapid = cpu_to_le64(snap_id);
6125        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6126                                  &rbd_dev->header_oloc, "get_snapshot_name",
6127                                  &snapid, sizeof(snapid), reply_buf, size);
6128        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6129        if (ret < 0) {
6130                snap_name = ERR_PTR(ret);
6131                goto out;
6132        }
6133
6134        p = reply_buf;
6135        end = reply_buf + ret;
6136        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6137        if (IS_ERR(snap_name))
6138                goto out;
6139
6140        dout("  snap_id 0x%016llx snap_name = %s\n",
6141                (unsigned long long)snap_id, snap_name);
6142out:
6143        kfree(reply_buf);
6144
6145        return snap_name;
6146}
6147
6148static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6149{
6150        bool first_time = rbd_dev->header.object_prefix == NULL;
6151        int ret;
6152
6153        ret = rbd_dev_v2_image_size(rbd_dev);
6154        if (ret)
6155                return ret;
6156
6157        if (first_time) {
6158                ret = rbd_dev_v2_header_onetime(rbd_dev);
6159                if (ret)
6160                        return ret;
6161        }
6162
6163        ret = rbd_dev_v2_snap_context(rbd_dev);
6164        if (ret && first_time) {
6165                kfree(rbd_dev->header.object_prefix);
6166                rbd_dev->header.object_prefix = NULL;
6167        }
6168
6169        return ret;
6170}
6171
6172static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6173{
6174        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6175
6176        if (rbd_dev->image_format == 1)
6177                return rbd_dev_v1_header_info(rbd_dev);
6178
6179        return rbd_dev_v2_header_info(rbd_dev);
6180}
6181
6182/*
6183 * Skips over white space at *buf, and updates *buf to point to the
6184 * first found non-space character (if any). Returns the length of
6185 * the token (string of non-white space characters) found.  Note
6186 * that *buf must be terminated with '\0'.
6187 */
6188static inline size_t next_token(const char **buf)
6189{
6190        /*
6191        * These are the characters that produce nonzero for
6192        * isspace() in the "C" and "POSIX" locales.
6193        */
6194        const char *spaces = " \f\n\r\t\v";
6195
6196        *buf += strspn(*buf, spaces);   /* Find start of token */
6197
6198        return strcspn(*buf, spaces);   /* Return token length */
6199}
6200
6201/*
6202 * Finds the next token in *buf, dynamically allocates a buffer big
6203 * enough to hold a copy of it, and copies the token into the new
6204 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6205 * that a duplicate buffer is created even for a zero-length token.
6206 *
6207 * Returns a pointer to the newly-allocated duplicate, or a null
6208 * pointer if memory for the duplicate was not available.  If
6209 * the lenp argument is a non-null pointer, the length of the token
6210 * (not including the '\0') is returned in *lenp.
6211 *
6212 * If successful, the *buf pointer will be updated to point beyond
6213 * the end of the found token.
6214 *
6215 * Note: uses GFP_KERNEL for allocation.
6216 */
6217static inline char *dup_token(const char **buf, size_t *lenp)
6218{
6219        char *dup;
6220        size_t len;
6221
6222        len = next_token(buf);
6223        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6224        if (!dup)
6225                return NULL;
6226        *(dup + len) = '\0';
6227        *buf += len;
6228
6229        if (lenp)
6230                *lenp = len;
6231
6232        return dup;
6233}
6234
6235static int rbd_parse_param(struct fs_parameter *param,
6236                            struct rbd_parse_opts_ctx *pctx)
6237{
6238        struct rbd_options *opt = pctx->opts;
6239        struct fs_parse_result result;
6240        struct p_log log = {.prefix = "rbd"};
6241        int token, ret;
6242
6243        ret = ceph_parse_param(param, pctx->copts, NULL);
6244        if (ret != -ENOPARAM)
6245                return ret;
6246
6247        token = __fs_parse(&log, rbd_parameters, param, &result);
6248        dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6249        if (token < 0) {
6250                if (token == -ENOPARAM)
6251                        return inval_plog(&log, "Unknown parameter '%s'",
6252                                          param->key);
6253                return token;
6254        }
6255
6256        switch (token) {
6257        case Opt_queue_depth:
6258                if (result.uint_32 < 1)
6259                        goto out_of_range;
6260                opt->queue_depth = result.uint_32;
6261                break;
6262        case Opt_alloc_size:
6263                if (result.uint_32 < SECTOR_SIZE)
6264                        goto out_of_range;
6265                if (!is_power_of_2(result.uint_32))
6266                        return inval_plog(&log, "alloc_size must be a power of 2");
6267                opt->alloc_size = result.uint_32;
6268                break;
6269        case Opt_lock_timeout:
6270                /* 0 is "wait forever" (i.e. infinite timeout) */
6271                if (result.uint_32 > INT_MAX / 1000)
6272                        goto out_of_range;
6273                opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6274                break;
6275        case Opt_pool_ns:
6276                kfree(pctx->spec->pool_ns);
6277                pctx->spec->pool_ns = param->string;
6278                param->string = NULL;
6279                break;
6280        case Opt_compression_hint:
6281                switch (result.uint_32) {
6282                case Opt_compression_hint_none:
6283                        opt->alloc_hint_flags &=
6284                            ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6285                              CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6286                        break;
6287                case Opt_compression_hint_compressible:
6288                        opt->alloc_hint_flags |=
6289                            CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6290                        opt->alloc_hint_flags &=
6291                            ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6292                        break;
6293                case Opt_compression_hint_incompressible:
6294                        opt->alloc_hint_flags |=
6295                            CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6296                        opt->alloc_hint_flags &=
6297                            ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6298                        break;
6299                default:
6300                        BUG();
6301                }
6302                break;
6303        case Opt_read_only:
6304                opt->read_only = true;
6305                break;
6306        case Opt_read_write:
6307                opt->read_only = false;
6308                break;
6309        case Opt_lock_on_read:
6310                opt->lock_on_read = true;
6311                break;
6312        case Opt_exclusive:
6313                opt->exclusive = true;
6314                break;
6315        case Opt_notrim:
6316                opt->trim = false;
6317                break;
6318        default:
6319                BUG();
6320        }
6321
6322        return 0;
6323
6324out_of_range:
6325        return inval_plog(&log, "%s out of range", param->key);
6326}
6327
6328/*
6329 * This duplicates most of generic_parse_monolithic(), untying it from
6330 * fs_context and skipping standard superblock and security options.
6331 */
6332static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6333{
6334        char *key;
6335        int ret = 0;
6336
6337        dout("%s '%s'\n", __func__, options);
6338        while ((key = strsep(&options, ",")) != NULL) {
6339                if (*key) {
6340                        struct fs_parameter param = {
6341                                .key    = key,
6342                                .type   = fs_value_is_flag,
6343                        };
6344                        char *value = strchr(key, '=');
6345                        size_t v_len = 0;
6346
6347                        if (value) {
6348                                if (value == key)
6349                                        continue;
6350                                *value++ = 0;
6351                                v_len = strlen(value);
6352                                param.string = kmemdup_nul(value, v_len,
6353                                                           GFP_KERNEL);
6354                                if (!param.string)
6355                                        return -ENOMEM;
6356                                param.type = fs_value_is_string;
6357                        }
6358                        param.size = v_len;
6359
6360                        ret = rbd_parse_param(&param, pctx);
6361                        kfree(param.string);
6362                        if (ret)
6363                                break;
6364                }
6365        }
6366
6367        return ret;
6368}
6369
6370/*
6371 * Parse the options provided for an "rbd add" (i.e., rbd image
6372 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6373 * and the data written is passed here via a NUL-terminated buffer.
6374 * Returns 0 if successful or an error code otherwise.
6375 *
6376 * The information extracted from these options is recorded in
6377 * the other parameters which return dynamically-allocated
6378 * structures:
6379 *  ceph_opts
6380 *      The address of a pointer that will refer to a ceph options
6381 *      structure.  Caller must release the returned pointer using
6382 *      ceph_destroy_options() when it is no longer needed.
6383 *  rbd_opts
6384 *      Address of an rbd options pointer.  Fully initialized by
6385 *      this function; caller must release with kfree().
6386 *  spec
6387 *      Address of an rbd image specification pointer.  Fully
6388 *      initialized by this function based on parsed options.
6389 *      Caller must release with rbd_spec_put().
6390 *
6391 * The options passed take this form:
6392 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6393 * where:
6394 *  <mon_addrs>
6395 *      A comma-separated list of one or more monitor addresses.
6396 *      A monitor address is an ip address, optionally followed
6397 *      by a port number (separated by a colon).
6398 *        I.e.:  ip1[:port1][,ip2[:port2]...]
6399 *  <options>
6400 *      A comma-separated list of ceph and/or rbd options.
6401 *  <pool_name>
6402 *      The name of the rados pool containing the rbd image.
6403 *  <image_name>
6404 *      The name of the image in that pool to map.
6405 *  <snap_id>
6406 *      An optional snapshot id.  If provided, the mapping will
6407 *      present data from the image at the time that snapshot was
6408 *      created.  The image head is used if no snapshot id is
6409 *      provided.  Snapshot mappings are always read-only.
6410 */
6411static int rbd_add_parse_args(const char *buf,
6412                                struct ceph_options **ceph_opts,
6413                                struct rbd_options **opts,
6414                                struct rbd_spec **rbd_spec)
6415{
6416        size_t len;
6417        char *options;
6418        const char *mon_addrs;
6419        char *snap_name;
6420        size_t mon_addrs_size;
6421        struct rbd_parse_opts_ctx pctx = { 0 };
6422        int ret;
6423
6424        /* The first four tokens are required */
6425
6426        len = next_token(&buf);
6427        if (!len) {
6428                rbd_warn(NULL, "no monitor address(es) provided");
6429                return -EINVAL;
6430        }
6431        mon_addrs = buf;
6432        mon_addrs_size = len;
6433        buf += len;
6434
6435        ret = -EINVAL;
6436        options = dup_token(&buf, NULL);
6437        if (!options)
6438                return -ENOMEM;
6439        if (!*options) {
6440                rbd_warn(NULL, "no options provided");
6441                goto out_err;
6442        }
6443
6444        pctx.spec = rbd_spec_alloc();
6445        if (!pctx.spec)
6446                goto out_mem;
6447
6448        pctx.spec->pool_name = dup_token(&buf, NULL);
6449        if (!pctx.spec->pool_name)
6450                goto out_mem;
6451        if (!*pctx.spec->pool_name) {
6452                rbd_warn(NULL, "no pool name provided");
6453                goto out_err;
6454        }
6455
6456        pctx.spec->image_name = dup_token(&buf, NULL);
6457        if (!pctx.spec->image_name)
6458                goto out_mem;
6459        if (!*pctx.spec->image_name) {
6460                rbd_warn(NULL, "no image name provided");
6461                goto out_err;
6462        }
6463
6464        /*
6465         * Snapshot name is optional; default is to use "-"
6466         * (indicating the head/no snapshot).
6467         */
6468        len = next_token(&buf);
6469        if (!len) {
6470                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6471                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6472        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6473                ret = -ENAMETOOLONG;
6474                goto out_err;
6475        }
6476        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6477        if (!snap_name)
6478                goto out_mem;
6479        *(snap_name + len) = '\0';
6480        pctx.spec->snap_name = snap_name;
6481
6482        pctx.copts = ceph_alloc_options();
6483        if (!pctx.copts)
6484                goto out_mem;
6485
6486        /* Initialize all rbd options to the defaults */
6487
6488        pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6489        if (!pctx.opts)
6490                goto out_mem;
6491
6492        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6493        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6494        pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6495        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6496        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6497        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6498        pctx.opts->trim = RBD_TRIM_DEFAULT;
6499
6500        ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6501        if (ret)
6502                goto out_err;
6503
6504        ret = rbd_parse_options(options, &pctx);
6505        if (ret)
6506                goto out_err;
6507
6508        *ceph_opts = pctx.copts;
6509        *opts = pctx.opts;
6510        *rbd_spec = pctx.spec;
6511        kfree(options);
6512        return 0;
6513
6514out_mem:
6515        ret = -ENOMEM;
6516out_err:
6517        kfree(pctx.opts);
6518        ceph_destroy_options(pctx.copts);
6519        rbd_spec_put(pctx.spec);
6520        kfree(options);
6521        return ret;
6522}
6523
6524static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6525{
6526        down_write(&rbd_dev->lock_rwsem);
6527        if (__rbd_is_lock_owner(rbd_dev))
6528                __rbd_release_lock(rbd_dev);
6529        up_write(&rbd_dev->lock_rwsem);
6530}
6531
6532/*
6533 * If the wait is interrupted, an error is returned even if the lock
6534 * was successfully acquired.  rbd_dev_image_unlock() will release it
6535 * if needed.
6536 */
6537static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6538{
6539        long ret;
6540
6541        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6542                if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6543                        return 0;
6544
6545                rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6546                return -EINVAL;
6547        }
6548
6549        if (rbd_is_ro(rbd_dev))
6550                return 0;
6551
6552        rbd_assert(!rbd_is_lock_owner(rbd_dev));
6553        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6554        ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6555                            ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6556        if (ret > 0) {
6557                ret = rbd_dev->acquire_err;
6558        } else {
6559                cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6560                if (!ret)
6561                        ret = -ETIMEDOUT;
6562        }
6563
6564        if (ret) {
6565                rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6566                return ret;
6567        }
6568
6569        /*
6570         * The lock may have been released by now, unless automatic lock
6571         * transitions are disabled.
6572         */
6573        rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6574        return 0;
6575}
6576
6577/*
6578 * An rbd format 2 image has a unique identifier, distinct from the
6579 * name given to it by the user.  Internally, that identifier is
6580 * what's used to specify the names of objects related to the image.
6581 *
6582 * A special "rbd id" object is used to map an rbd image name to its
6583 * id.  If that object doesn't exist, then there is no v2 rbd image
6584 * with the supplied name.
6585 *
6586 * This function will record the given rbd_dev's image_id field if
6587 * it can be determined, and in that case will return 0.  If any
6588 * errors occur a negative errno will be returned and the rbd_dev's
6589 * image_id field will be unchanged (and should be NULL).
6590 */
6591static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6592{
6593        int ret;
6594        size_t size;
6595        CEPH_DEFINE_OID_ONSTACK(oid);
6596        void *response;
6597        char *image_id;
6598
6599        /*
6600         * When probing a parent image, the image id is already
6601         * known (and the image name likely is not).  There's no
6602         * need to fetch the image id again in this case.  We
6603         * do still need to set the image format though.
6604         */
6605        if (rbd_dev->spec->image_id) {
6606                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6607
6608                return 0;
6609        }
6610
6611        /*
6612         * First, see if the format 2 image id file exists, and if
6613         * so, get the image's persistent id from it.
6614         */
6615        ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6616                               rbd_dev->spec->image_name);
6617        if (ret)
6618                return ret;
6619
6620        dout("rbd id object name is %s\n", oid.name);
6621
6622        /* Response will be an encoded string, which includes a length */
6623        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6624        response = kzalloc(size, GFP_NOIO);
6625        if (!response) {
6626                ret = -ENOMEM;
6627                goto out;
6628        }
6629
6630        /* If it doesn't exist we'll assume it's a format 1 image */
6631
6632        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6633                                  "get_id", NULL, 0,
6634                                  response, size);
6635        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6636        if (ret == -ENOENT) {
6637                image_id = kstrdup("", GFP_KERNEL);
6638                ret = image_id ? 0 : -ENOMEM;
6639                if (!ret)
6640                        rbd_dev->image_format = 1;
6641        } else if (ret >= 0) {
6642                void *p = response;
6643
6644                image_id = ceph_extract_encoded_string(&p, p + ret,
6645                                                NULL, GFP_NOIO);
6646                ret = PTR_ERR_OR_ZERO(image_id);
6647                if (!ret)
6648                        rbd_dev->image_format = 2;
6649        }
6650
6651        if (!ret) {
6652                rbd_dev->spec->image_id = image_id;
6653                dout("image_id is %s\n", image_id);
6654        }
6655out:
6656        kfree(response);
6657        ceph_oid_destroy(&oid);
6658        return ret;
6659}
6660
6661/*
6662 * Undo whatever state changes are made by v1 or v2 header info
6663 * call.
6664 */
6665static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6666{
6667        struct rbd_image_header *header;
6668
6669        rbd_dev_parent_put(rbd_dev);
6670        rbd_object_map_free(rbd_dev);
6671        rbd_dev_mapping_clear(rbd_dev);
6672
6673        /* Free dynamic fields from the header, then zero it out */
6674
6675        header = &rbd_dev->header;
6676        ceph_put_snap_context(header->snapc);
6677        kfree(header->snap_sizes);
6678        kfree(header->snap_names);
6679        kfree(header->object_prefix);
6680        memset(header, 0, sizeof (*header));
6681}
6682
6683static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6684{
6685        int ret;
6686
6687        ret = rbd_dev_v2_object_prefix(rbd_dev);
6688        if (ret)
6689                goto out_err;
6690
6691        /*
6692         * Get the and check features for the image.  Currently the
6693         * features are assumed to never change.
6694         */
6695        ret = rbd_dev_v2_features(rbd_dev);
6696        if (ret)
6697                goto out_err;
6698
6699        /* If the image supports fancy striping, get its parameters */
6700
6701        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6702                ret = rbd_dev_v2_striping_info(rbd_dev);
6703                if (ret < 0)
6704                        goto out_err;
6705        }
6706
6707        if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6708                ret = rbd_dev_v2_data_pool(rbd_dev);
6709                if (ret)
6710                        goto out_err;
6711        }
6712
6713        rbd_init_layout(rbd_dev);
6714        return 0;
6715
6716out_err:
6717        rbd_dev->header.features = 0;
6718        kfree(rbd_dev->header.object_prefix);
6719        rbd_dev->header.object_prefix = NULL;
6720        return ret;
6721}
6722
6723/*
6724 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6725 * rbd_dev_image_probe() recursion depth, which means it's also the
6726 * length of the already discovered part of the parent chain.
6727 */
6728static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6729{
6730        struct rbd_device *parent = NULL;
6731        int ret;
6732
6733        if (!rbd_dev->parent_spec)
6734                return 0;
6735
6736        if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6737                pr_info("parent chain is too long (%d)\n", depth);
6738                ret = -EINVAL;
6739                goto out_err;
6740        }
6741
6742        parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6743        if (!parent) {
6744                ret = -ENOMEM;
6745                goto out_err;
6746        }
6747
6748        /*
6749         * Images related by parent/child relationships always share
6750         * rbd_client and spec/parent_spec, so bump their refcounts.
6751         */
6752        __rbd_get_client(rbd_dev->rbd_client);
6753        rbd_spec_get(rbd_dev->parent_spec);
6754
6755        __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6756
6757        ret = rbd_dev_image_probe(parent, depth);
6758        if (ret < 0)
6759                goto out_err;
6760
6761        rbd_dev->parent = parent;
6762        atomic_set(&rbd_dev->parent_ref, 1);
6763        return 0;
6764
6765out_err:
6766        rbd_dev_unparent(rbd_dev);
6767        rbd_dev_destroy(parent);
6768        return ret;
6769}
6770
6771static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6772{
6773        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6774        rbd_free_disk(rbd_dev);
6775        if (!single_major)
6776                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6777}
6778
6779/*
6780 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6781 * upon return.
6782 */
6783static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6784{
6785        int ret;
6786
6787        /* Record our major and minor device numbers. */
6788
6789        if (!single_major) {
6790                ret = register_blkdev(0, rbd_dev->name);
6791                if (ret < 0)
6792                        goto err_out_unlock;
6793
6794                rbd_dev->major = ret;
6795                rbd_dev->minor = 0;
6796        } else {
6797                rbd_dev->major = rbd_major;
6798                rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6799        }
6800
6801        /* Set up the blkdev mapping. */
6802
6803        ret = rbd_init_disk(rbd_dev);
6804        if (ret)
6805                goto err_out_blkdev;
6806
6807        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6808        set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6809
6810        ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6811        if (ret)
6812                goto err_out_disk;
6813
6814        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6815        up_write(&rbd_dev->header_rwsem);
6816        return 0;
6817
6818err_out_disk:
6819        rbd_free_disk(rbd_dev);
6820err_out_blkdev:
6821        if (!single_major)
6822                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6823err_out_unlock:
6824        up_write(&rbd_dev->header_rwsem);
6825        return ret;
6826}
6827
6828static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6829{
6830        struct rbd_spec *spec = rbd_dev->spec;
6831        int ret;
6832
6833        /* Record the header object name for this rbd image. */
6834
6835        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6836        if (rbd_dev->image_format == 1)
6837                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6838                                       spec->image_name, RBD_SUFFIX);
6839        else
6840                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6841                                       RBD_HEADER_PREFIX, spec->image_id);
6842
6843        return ret;
6844}
6845
6846static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6847{
6848        if (!is_snap) {
6849                pr_info("image %s/%s%s%s does not exist\n",
6850                        rbd_dev->spec->pool_name,
6851                        rbd_dev->spec->pool_ns ?: "",
6852                        rbd_dev->spec->pool_ns ? "/" : "",
6853                        rbd_dev->spec->image_name);
6854        } else {
6855                pr_info("snap %s/%s%s%s@%s does not exist\n",
6856                        rbd_dev->spec->pool_name,
6857                        rbd_dev->spec->pool_ns ?: "",
6858                        rbd_dev->spec->pool_ns ? "/" : "",
6859                        rbd_dev->spec->image_name,
6860                        rbd_dev->spec->snap_name);
6861        }
6862}
6863
6864static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6865{
6866        if (!rbd_is_ro(rbd_dev))
6867                rbd_unregister_watch(rbd_dev);
6868
6869        rbd_dev_unprobe(rbd_dev);
6870        rbd_dev->image_format = 0;
6871        kfree(rbd_dev->spec->image_id);
6872        rbd_dev->spec->image_id = NULL;
6873}
6874
6875/*
6876 * Probe for the existence of the header object for the given rbd
6877 * device.  If this image is the one being mapped (i.e., not a
6878 * parent), initiate a watch on its header object before using that
6879 * object to get detailed information about the rbd image.
6880 *
6881 * On success, returns with header_rwsem held for write if called
6882 * with @depth == 0.
6883 */
6884static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6885{
6886        bool need_watch = !rbd_is_ro(rbd_dev);
6887        int ret;
6888
6889        /*
6890         * Get the id from the image id object.  Unless there's an
6891         * error, rbd_dev->spec->image_id will be filled in with
6892         * a dynamically-allocated string, and rbd_dev->image_format
6893         * will be set to either 1 or 2.
6894         */
6895        ret = rbd_dev_image_id(rbd_dev);
6896        if (ret)
6897                return ret;
6898
6899        ret = rbd_dev_header_name(rbd_dev);
6900        if (ret)
6901                goto err_out_format;
6902
6903        if (need_watch) {
6904                ret = rbd_register_watch(rbd_dev);
6905                if (ret) {
6906                        if (ret == -ENOENT)
6907                                rbd_print_dne(rbd_dev, false);
6908                        goto err_out_format;
6909                }
6910        }
6911
6912        if (!depth)
6913                down_write(&rbd_dev->header_rwsem);
6914
6915        ret = rbd_dev_header_info(rbd_dev);
6916        if (ret) {
6917                if (ret == -ENOENT && !need_watch)
6918                        rbd_print_dne(rbd_dev, false);
6919                goto err_out_probe;
6920        }
6921
6922        /*
6923         * If this image is the one being mapped, we have pool name and
6924         * id, image name and id, and snap name - need to fill snap id.
6925         * Otherwise this is a parent image, identified by pool, image
6926         * and snap ids - need to fill in names for those ids.
6927         */
6928        if (!depth)
6929                ret = rbd_spec_fill_snap_id(rbd_dev);
6930        else
6931                ret = rbd_spec_fill_names(rbd_dev);
6932        if (ret) {
6933                if (ret == -ENOENT)
6934                        rbd_print_dne(rbd_dev, true);
6935                goto err_out_probe;
6936        }
6937
6938        ret = rbd_dev_mapping_set(rbd_dev);
6939        if (ret)
6940                goto err_out_probe;
6941
6942        if (rbd_is_snap(rbd_dev) &&
6943            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6944                ret = rbd_object_map_load(rbd_dev);
6945                if (ret)
6946                        goto err_out_probe;
6947        }
6948
6949        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6950                ret = rbd_dev_v2_parent_info(rbd_dev);
6951                if (ret)
6952                        goto err_out_probe;
6953        }
6954
6955        ret = rbd_dev_probe_parent(rbd_dev, depth);
6956        if (ret)
6957                goto err_out_probe;
6958
6959        dout("discovered format %u image, header name is %s\n",
6960                rbd_dev->image_format, rbd_dev->header_oid.name);
6961        return 0;
6962
6963err_out_probe:
6964        if (!depth)
6965                up_write(&rbd_dev->header_rwsem);
6966        if (need_watch)
6967                rbd_unregister_watch(rbd_dev);
6968        rbd_dev_unprobe(rbd_dev);
6969err_out_format:
6970        rbd_dev->image_format = 0;
6971        kfree(rbd_dev->spec->image_id);
6972        rbd_dev->spec->image_id = NULL;
6973        return ret;
6974}
6975
6976static ssize_t do_rbd_add(struct bus_type *bus,
6977                          const char *buf,
6978                          size_t count)
6979{
6980        struct rbd_device *rbd_dev = NULL;
6981        struct ceph_options *ceph_opts = NULL;
6982        struct rbd_options *rbd_opts = NULL;
6983        struct rbd_spec *spec = NULL;
6984        struct rbd_client *rbdc;
6985        int rc;
6986
6987        if (!capable(CAP_SYS_ADMIN))
6988                return -EPERM;
6989
6990        if (!try_module_get(THIS_MODULE))
6991                return -ENODEV;
6992
6993        /* parse add command */
6994        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6995        if (rc < 0)
6996                goto out;
6997
6998        rbdc = rbd_get_client(ceph_opts);
6999        if (IS_ERR(rbdc)) {
7000                rc = PTR_ERR(rbdc);
7001                goto err_out_args;
7002        }
7003
7004        /* pick the pool */
7005        rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7006        if (rc < 0) {
7007                if (rc == -ENOENT)
7008                        pr_info("pool %s does not exist\n", spec->pool_name);
7009                goto err_out_client;
7010        }
7011        spec->pool_id = (u64)rc;
7012
7013        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7014        if (!rbd_dev) {
7015                rc = -ENOMEM;
7016                goto err_out_client;
7017        }
7018        rbdc = NULL;            /* rbd_dev now owns this */
7019        spec = NULL;            /* rbd_dev now owns this */
7020        rbd_opts = NULL;        /* rbd_dev now owns this */
7021
7022        /* if we are mapping a snapshot it will be a read-only mapping */
7023        if (rbd_dev->opts->read_only ||
7024            strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7025                __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7026
7027        rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7028        if (!rbd_dev->config_info) {
7029                rc = -ENOMEM;
7030                goto err_out_rbd_dev;
7031        }
7032
7033        rc = rbd_dev_image_probe(rbd_dev, 0);
7034        if (rc < 0)
7035                goto err_out_rbd_dev;
7036
7037        if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7038                rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7039                         rbd_dev->layout.object_size);
7040                rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7041        }
7042
7043        rc = rbd_dev_device_setup(rbd_dev);
7044        if (rc)
7045                goto err_out_image_probe;
7046
7047        rc = rbd_add_acquire_lock(rbd_dev);
7048        if (rc)
7049                goto err_out_image_lock;
7050
7051        /* Everything's ready.  Announce the disk to the world. */
7052
7053        rc = device_add(&rbd_dev->dev);
7054        if (rc)
7055                goto err_out_image_lock;
7056
7057        device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7058
7059        spin_lock(&rbd_dev_list_lock);
7060        list_add_tail(&rbd_dev->node, &rbd_dev_list);
7061        spin_unlock(&rbd_dev_list_lock);
7062
7063        pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7064                (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7065                rbd_dev->header.features);
7066        rc = count;
7067out:
7068        module_put(THIS_MODULE);
7069        return rc;
7070
7071err_out_image_lock:
7072        rbd_dev_image_unlock(rbd_dev);
7073        rbd_dev_device_release(rbd_dev);
7074err_out_image_probe:
7075        rbd_dev_image_release(rbd_dev);
7076err_out_rbd_dev:
7077        rbd_dev_destroy(rbd_dev);
7078err_out_client:
7079        rbd_put_client(rbdc);
7080err_out_args:
7081        rbd_spec_put(spec);
7082        kfree(rbd_opts);
7083        goto out;
7084}
7085
7086static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7087{
7088        if (single_major)
7089                return -EINVAL;
7090
7091        return do_rbd_add(bus, buf, count);
7092}
7093
7094static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7095                                      size_t count)
7096{
7097        return do_rbd_add(bus, buf, count);
7098}
7099
7100static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7101{
7102        while (rbd_dev->parent) {
7103                struct rbd_device *first = rbd_dev;
7104                struct rbd_device *second = first->parent;
7105                struct rbd_device *third;
7106
7107                /*
7108                 * Follow to the parent with no grandparent and
7109                 * remove it.
7110                 */
7111                while (second && (third = second->parent)) {
7112                        first = second;
7113                        second = third;
7114                }
7115                rbd_assert(second);
7116                rbd_dev_image_release(second);
7117                rbd_dev_destroy(second);
7118                first->parent = NULL;
7119                first->parent_overlap = 0;
7120
7121                rbd_assert(first->parent_spec);
7122                rbd_spec_put(first->parent_spec);
7123                first->parent_spec = NULL;
7124        }
7125}
7126
7127static ssize_t do_rbd_remove(struct bus_type *bus,
7128                             const char *buf,
7129                             size_t count)
7130{
7131        struct rbd_device *rbd_dev = NULL;
7132        struct list_head *tmp;
7133        int dev_id;
7134        char opt_buf[6];
7135        bool force = false;
7136        int ret;
7137
7138        if (!capable(CAP_SYS_ADMIN))
7139                return -EPERM;
7140
7141        dev_id = -1;
7142        opt_buf[0] = '\0';
7143        sscanf(buf, "%d %5s", &dev_id, opt_buf);
7144        if (dev_id < 0) {
7145                pr_err("dev_id out of range\n");
7146                return -EINVAL;
7147        }
7148        if (opt_buf[0] != '\0') {
7149                if (!strcmp(opt_buf, "force")) {
7150                        force = true;
7151                } else {
7152                        pr_err("bad remove option at '%s'\n", opt_buf);
7153                        return -EINVAL;
7154                }
7155        }
7156
7157        ret = -ENOENT;
7158        spin_lock(&rbd_dev_list_lock);
7159        list_for_each(tmp, &rbd_dev_list) {
7160                rbd_dev = list_entry(tmp, struct rbd_device, node);
7161                if (rbd_dev->dev_id == dev_id) {
7162                        ret = 0;
7163                        break;
7164                }
7165        }
7166        if (!ret) {
7167                spin_lock_irq(&rbd_dev->lock);
7168                if (rbd_dev->open_count && !force)
7169                        ret = -EBUSY;
7170                else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7171                                          &rbd_dev->flags))
7172                        ret = -EINPROGRESS;
7173                spin_unlock_irq(&rbd_dev->lock);
7174        }
7175        spin_unlock(&rbd_dev_list_lock);
7176        if (ret)
7177                return ret;
7178
7179        if (force) {
7180                /*
7181                 * Prevent new IO from being queued and wait for existing
7182                 * IO to complete/fail.
7183                 */
7184                blk_mq_freeze_queue(rbd_dev->disk->queue);
7185                blk_set_queue_dying(rbd_dev->disk->queue);
7186        }
7187
7188        del_gendisk(rbd_dev->disk);
7189        spin_lock(&rbd_dev_list_lock);
7190        list_del_init(&rbd_dev->node);
7191        spin_unlock(&rbd_dev_list_lock);
7192        device_del(&rbd_dev->dev);
7193
7194        rbd_dev_image_unlock(rbd_dev);
7195        rbd_dev_device_release(rbd_dev);
7196        rbd_dev_image_release(rbd_dev);
7197        rbd_dev_destroy(rbd_dev);
7198        return count;
7199}
7200
7201static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7202{
7203        if (single_major)
7204                return -EINVAL;
7205
7206        return do_rbd_remove(bus, buf, count);
7207}
7208
7209static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7210                                         size_t count)
7211{
7212        return do_rbd_remove(bus, buf, count);
7213}
7214
7215/*
7216 * create control files in sysfs
7217 * /sys/bus/rbd/...
7218 */
7219static int __init rbd_sysfs_init(void)
7220{
7221        int ret;
7222
7223        ret = device_register(&rbd_root_dev);
7224        if (ret < 0)
7225                return ret;
7226
7227        ret = bus_register(&rbd_bus_type);
7228        if (ret < 0)
7229                device_unregister(&rbd_root_dev);
7230
7231        return ret;
7232}
7233
7234static void __exit rbd_sysfs_cleanup(void)
7235{
7236        bus_unregister(&rbd_bus_type);
7237        device_unregister(&rbd_root_dev);
7238}
7239
7240static int __init rbd_slab_init(void)
7241{
7242        rbd_assert(!rbd_img_request_cache);
7243        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7244        if (!rbd_img_request_cache)
7245                return -ENOMEM;
7246
7247        rbd_assert(!rbd_obj_request_cache);
7248        rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7249        if (!rbd_obj_request_cache)
7250                goto out_err;
7251
7252        return 0;
7253
7254out_err:
7255        kmem_cache_destroy(rbd_img_request_cache);
7256        rbd_img_request_cache = NULL;
7257        return -ENOMEM;
7258}
7259
7260static void rbd_slab_exit(void)
7261{
7262        rbd_assert(rbd_obj_request_cache);
7263        kmem_cache_destroy(rbd_obj_request_cache);
7264        rbd_obj_request_cache = NULL;
7265
7266        rbd_assert(rbd_img_request_cache);
7267        kmem_cache_destroy(rbd_img_request_cache);
7268        rbd_img_request_cache = NULL;
7269}
7270
7271static int __init rbd_init(void)
7272{
7273        int rc;
7274
7275        if (!libceph_compatible(NULL)) {
7276                rbd_warn(NULL, "libceph incompatibility (quitting)");
7277                return -EINVAL;
7278        }
7279
7280        rc = rbd_slab_init();
7281        if (rc)
7282                return rc;
7283
7284        /*
7285         * The number of active work items is limited by the number of
7286         * rbd devices * queue depth, so leave @max_active at default.
7287         */
7288        rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7289        if (!rbd_wq) {
7290                rc = -ENOMEM;
7291                goto err_out_slab;
7292        }
7293
7294        if (single_major) {
7295                rbd_major = register_blkdev(0, RBD_DRV_NAME);
7296                if (rbd_major < 0) {
7297                        rc = rbd_major;
7298                        goto err_out_wq;
7299                }
7300        }
7301
7302        rc = rbd_sysfs_init();
7303        if (rc)
7304                goto err_out_blkdev;
7305
7306        if (single_major)
7307                pr_info("loaded (major %d)\n", rbd_major);
7308        else
7309                pr_info("loaded\n");
7310
7311        return 0;
7312
7313err_out_blkdev:
7314        if (single_major)
7315                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7316err_out_wq:
7317        destroy_workqueue(rbd_wq);
7318err_out_slab:
7319        rbd_slab_exit();
7320        return rc;
7321}
7322
7323static void __exit rbd_exit(void)
7324{
7325        ida_destroy(&rbd_dev_id_ida);
7326        rbd_sysfs_cleanup();
7327        if (single_major)
7328                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7329        destroy_workqueue(rbd_wq);
7330        rbd_slab_exit();
7331}
7332
7333module_init(rbd_init);
7334module_exit(rbd_exit);
7335
7336MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7337MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7338MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7339/* following authorship retained from original osdblk.c */
7340MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7341
7342MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7343MODULE_LICENSE("GPL");
7344