linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/striper.h>
  36#include <linux/ceph/decode.h>
  37#include <linux/fs_parser.h>
  38#include <linux/bsearch.h>
  39
  40#include <linux/kernel.h>
  41#include <linux/device.h>
  42#include <linux/module.h>
  43#include <linux/blk-mq.h>
  44#include <linux/fs.h>
  45#include <linux/blkdev.h>
  46#include <linux/slab.h>
  47#include <linux/idr.h>
  48#include <linux/workqueue.h>
  49
  50#include "rbd_types.h"
  51
  52#define RBD_DEBUG       /* Activate rbd_assert() calls */
  53
  54/*
  55 * Increment the given counter and return its updated value.
  56 * If the counter is already 0 it will not be incremented.
  57 * If the counter is already at its maximum value returns
  58 * -EINVAL without updating it.
  59 */
  60static int atomic_inc_return_safe(atomic_t *v)
  61{
  62        unsigned int counter;
  63
  64        counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
  65        if (counter <= (unsigned int)INT_MAX)
  66                return (int)counter;
  67
  68        atomic_dec(v);
  69
  70        return -EINVAL;
  71}
  72
  73/* Decrement the counter.  Return the resulting value, or -EINVAL */
  74static int atomic_dec_return_safe(atomic_t *v)
  75{
  76        int counter;
  77
  78        counter = atomic_dec_return(v);
  79        if (counter >= 0)
  80                return counter;
  81
  82        atomic_inc(v);
  83
  84        return -EINVAL;
  85}
  86
  87#define RBD_DRV_NAME "rbd"
  88
  89#define RBD_MINORS_PER_MAJOR            256
  90#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  91
  92#define RBD_MAX_PARENT_CHAIN_LEN        16
  93
  94#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  95#define RBD_MAX_SNAP_NAME_LEN   \
  96                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  97
  98#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
  99
 100#define RBD_SNAP_HEAD_NAME      "-"
 101
 102#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 103
 104/* This allows a single page to hold an image name sent by OSD */
 105#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 106#define RBD_IMAGE_ID_LEN_MAX    64
 107
 108#define RBD_OBJ_PREFIX_LEN_MAX  64
 109
 110#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 111#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 112
 113/* Feature bits */
 114
 115#define RBD_FEATURE_LAYERING            (1ULL<<0)
 116#define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
 117#define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
 118#define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
 119#define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
 120#define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
 121#define RBD_FEATURE_DATA_POOL           (1ULL<<7)
 122#define RBD_FEATURE_OPERATIONS          (1ULL<<8)
 123
 124#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 125                                 RBD_FEATURE_STRIPINGV2 |       \
 126                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
 127                                 RBD_FEATURE_OBJECT_MAP |       \
 128                                 RBD_FEATURE_FAST_DIFF |        \
 129                                 RBD_FEATURE_DEEP_FLATTEN |     \
 130                                 RBD_FEATURE_DATA_POOL |        \
 131                                 RBD_FEATURE_OPERATIONS)
 132
 133/* Features supported by this (client software) implementation. */
 134
 135#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 136
 137/*
 138 * An RBD device name will be "rbd#", where the "rbd" comes from
 139 * RBD_DRV_NAME above, and # is a unique integer identifier.
 140 */
 141#define DEV_NAME_LEN            32
 142
 143/*
 144 * block device image metadata (in-memory version)
 145 */
 146struct rbd_image_header {
 147        /* These six fields never change for a given rbd image */
 148        char *object_prefix;
 149        __u8 obj_order;
 150        u64 stripe_unit;
 151        u64 stripe_count;
 152        s64 data_pool_id;
 153        u64 features;           /* Might be changeable someday? */
 154
 155        /* The remaining fields need to be updated occasionally */
 156        u64 image_size;
 157        struct ceph_snap_context *snapc;
 158        char *snap_names;       /* format 1 only */
 159        u64 *snap_sizes;        /* format 1 only */
 160};
 161
 162/*
 163 * An rbd image specification.
 164 *
 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 166 * identify an image.  Each rbd_dev structure includes a pointer to
 167 * an rbd_spec structure that encapsulates this identity.
 168 *
 169 * Each of the id's in an rbd_spec has an associated name.  For a
 170 * user-mapped image, the names are supplied and the id's associated
 171 * with them are looked up.  For a layered image, a parent image is
 172 * defined by the tuple, and the names are looked up.
 173 *
 174 * An rbd_dev structure contains a parent_spec pointer which is
 175 * non-null if the image it represents is a child in a layered
 176 * image.  This pointer will refer to the rbd_spec structure used
 177 * by the parent rbd_dev for its own identity (i.e., the structure
 178 * is shared between the parent and child).
 179 *
 180 * Since these structures are populated once, during the discovery
 181 * phase of image construction, they are effectively immutable so
 182 * we make no effort to synchronize access to them.
 183 *
 184 * Note that code herein does not assume the image name is known (it
 185 * could be a null pointer).
 186 */
 187struct rbd_spec {
 188        u64             pool_id;
 189        const char      *pool_name;
 190        const char      *pool_ns;       /* NULL if default, never "" */
 191
 192        const char      *image_id;
 193        const char      *image_name;
 194
 195        u64             snap_id;
 196        const char      *snap_name;
 197
 198        struct kref     kref;
 199};
 200
 201/*
 202 * an instance of the client.  multiple devices may share an rbd client.
 203 */
 204struct rbd_client {
 205        struct ceph_client      *client;
 206        struct kref             kref;
 207        struct list_head        node;
 208};
 209
 210struct pending_result {
 211        int                     result;         /* first nonzero result */
 212        int                     num_pending;
 213};
 214
 215struct rbd_img_request;
 216
 217enum obj_request_type {
 218        OBJ_REQUEST_NODATA = 1,
 219        OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
 220        OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
 221        OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 222};
 223
 224enum obj_operation_type {
 225        OBJ_OP_READ = 1,
 226        OBJ_OP_WRITE,
 227        OBJ_OP_DISCARD,
 228        OBJ_OP_ZEROOUT,
 229};
 230
 231#define RBD_OBJ_FLAG_DELETION                   (1U << 0)
 232#define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
 233#define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
 234#define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
 235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
 236
 237enum rbd_obj_read_state {
 238        RBD_OBJ_READ_START = 1,
 239        RBD_OBJ_READ_OBJECT,
 240        RBD_OBJ_READ_PARENT,
 241};
 242
 243/*
 244 * Writes go through the following state machine to deal with
 245 * layering:
 246 *
 247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
 248 *            .                 |                                    .
 249 *            .                 v                                    .
 250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
 251 *            .                 |                    .               .
 252 *            .                 v                    v (deep-copyup  .
 253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
 254 * flattened) v                 |                    .               .
 255 *            .                 v                    .               .
 256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
 257 *                              |                        not needed) v
 258 *                              v                                    .
 259 *                            done . . . . . . . . . . . . . . . . . .
 260 *                              ^
 261 *                              |
 262 *                     RBD_OBJ_WRITE_FLAT
 263 *
 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 265 * assert_exists guard is needed or not (in some cases it's not needed
 266 * even if there is a parent).
 267 */
 268enum rbd_obj_write_state {
 269        RBD_OBJ_WRITE_START = 1,
 270        RBD_OBJ_WRITE_PRE_OBJECT_MAP,
 271        RBD_OBJ_WRITE_OBJECT,
 272        __RBD_OBJ_WRITE_COPYUP,
 273        RBD_OBJ_WRITE_COPYUP,
 274        RBD_OBJ_WRITE_POST_OBJECT_MAP,
 275};
 276
 277enum rbd_obj_copyup_state {
 278        RBD_OBJ_COPYUP_START = 1,
 279        RBD_OBJ_COPYUP_READ_PARENT,
 280        __RBD_OBJ_COPYUP_OBJECT_MAPS,
 281        RBD_OBJ_COPYUP_OBJECT_MAPS,
 282        __RBD_OBJ_COPYUP_WRITE_OBJECT,
 283        RBD_OBJ_COPYUP_WRITE_OBJECT,
 284};
 285
 286struct rbd_obj_request {
 287        struct ceph_object_extent ex;
 288        unsigned int            flags;  /* RBD_OBJ_FLAG_* */
 289        union {
 290                enum rbd_obj_read_state  read_state;    /* for reads */
 291                enum rbd_obj_write_state write_state;   /* for writes */
 292        };
 293
 294        struct rbd_img_request  *img_request;
 295        struct ceph_file_extent *img_extents;
 296        u32                     num_img_extents;
 297
 298        union {
 299                struct ceph_bio_iter    bio_pos;
 300                struct {
 301                        struct ceph_bvec_iter   bvec_pos;
 302                        u32                     bvec_count;
 303                        u32                     bvec_idx;
 304                };
 305        };
 306
 307        enum rbd_obj_copyup_state copyup_state;
 308        struct bio_vec          *copyup_bvecs;
 309        u32                     copyup_bvec_count;
 310
 311        struct list_head        osd_reqs;       /* w/ r_private_item */
 312
 313        struct mutex            state_mutex;
 314        struct pending_result   pending;
 315        struct kref             kref;
 316};
 317
 318enum img_req_flags {
 319        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 320        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 321};
 322
 323enum rbd_img_state {
 324        RBD_IMG_START = 1,
 325        RBD_IMG_EXCLUSIVE_LOCK,
 326        __RBD_IMG_OBJECT_REQUESTS,
 327        RBD_IMG_OBJECT_REQUESTS,
 328};
 329
 330struct rbd_img_request {
 331        struct rbd_device       *rbd_dev;
 332        enum obj_operation_type op_type;
 333        enum obj_request_type   data_type;
 334        unsigned long           flags;
 335        enum rbd_img_state      state;
 336        union {
 337                u64                     snap_id;        /* for reads */
 338                struct ceph_snap_context *snapc;        /* for writes */
 339        };
 340        struct rbd_obj_request  *obj_request;   /* obj req initiator */
 341
 342        struct list_head        lock_item;
 343        struct list_head        object_extents; /* obj_req.ex structs */
 344
 345        struct mutex            state_mutex;
 346        struct pending_result   pending;
 347        struct work_struct      work;
 348        int                     work_result;
 349};
 350
 351#define for_each_obj_request(ireq, oreq) \
 352        list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 353#define for_each_obj_request_safe(ireq, oreq, n) \
 354        list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 355
 356enum rbd_watch_state {
 357        RBD_WATCH_STATE_UNREGISTERED,
 358        RBD_WATCH_STATE_REGISTERED,
 359        RBD_WATCH_STATE_ERROR,
 360};
 361
 362enum rbd_lock_state {
 363        RBD_LOCK_STATE_UNLOCKED,
 364        RBD_LOCK_STATE_LOCKED,
 365        RBD_LOCK_STATE_RELEASING,
 366};
 367
 368/* WatchNotify::ClientId */
 369struct rbd_client_id {
 370        u64 gid;
 371        u64 handle;
 372};
 373
 374struct rbd_mapping {
 375        u64                     size;
 376};
 377
 378/*
 379 * a single device
 380 */
 381struct rbd_device {
 382        int                     dev_id;         /* blkdev unique id */
 383
 384        int                     major;          /* blkdev assigned major */
 385        int                     minor;
 386        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 387
 388        u32                     image_format;   /* Either 1 or 2 */
 389        struct rbd_client       *rbd_client;
 390
 391        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 392
 393        spinlock_t              lock;           /* queue, flags, open_count */
 394
 395        struct rbd_image_header header;
 396        unsigned long           flags;          /* possibly lock protected */
 397        struct rbd_spec         *spec;
 398        struct rbd_options      *opts;
 399        char                    *config_info;   /* add{,_single_major} string */
 400
 401        struct ceph_object_id   header_oid;
 402        struct ceph_object_locator header_oloc;
 403
 404        struct ceph_file_layout layout;         /* used for all rbd requests */
 405
 406        struct mutex            watch_mutex;
 407        enum rbd_watch_state    watch_state;
 408        struct ceph_osd_linger_request *watch_handle;
 409        u64                     watch_cookie;
 410        struct delayed_work     watch_dwork;
 411
 412        struct rw_semaphore     lock_rwsem;
 413        enum rbd_lock_state     lock_state;
 414        char                    lock_cookie[32];
 415        struct rbd_client_id    owner_cid;
 416        struct work_struct      acquired_lock_work;
 417        struct work_struct      released_lock_work;
 418        struct delayed_work     lock_dwork;
 419        struct work_struct      unlock_work;
 420        spinlock_t              lock_lists_lock;
 421        struct list_head        acquiring_list;
 422        struct list_head        running_list;
 423        struct completion       acquire_wait;
 424        int                     acquire_err;
 425        struct completion       releasing_wait;
 426
 427        spinlock_t              object_map_lock;
 428        u8                      *object_map;
 429        u64                     object_map_size;        /* in objects */
 430        u64                     object_map_flags;
 431
 432        struct workqueue_struct *task_wq;
 433
 434        struct rbd_spec         *parent_spec;
 435        u64                     parent_overlap;
 436        atomic_t                parent_ref;
 437        struct rbd_device       *parent;
 438
 439        /* Block layer tags. */
 440        struct blk_mq_tag_set   tag_set;
 441
 442        /* protects updating the header */
 443        struct rw_semaphore     header_rwsem;
 444
 445        struct rbd_mapping      mapping;
 446
 447        struct list_head        node;
 448
 449        /* sysfs related */
 450        struct device           dev;
 451        unsigned long           open_count;     /* protected by lock */
 452};
 453
 454/*
 455 * Flag bits for rbd_dev->flags:
 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 457 *   by rbd_dev->lock
 458 */
 459enum rbd_dev_flags {
 460        RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
 461        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 462        RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 463};
 464
 465static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 466
 467static LIST_HEAD(rbd_dev_list);    /* devices */
 468static DEFINE_SPINLOCK(rbd_dev_list_lock);
 469
 470static LIST_HEAD(rbd_client_list);              /* clients */
 471static DEFINE_SPINLOCK(rbd_client_list_lock);
 472
 473/* Slab caches for frequently-allocated structures */
 474
 475static struct kmem_cache        *rbd_img_request_cache;
 476static struct kmem_cache        *rbd_obj_request_cache;
 477
 478static int rbd_major;
 479static DEFINE_IDA(rbd_dev_id_ida);
 480
 481static struct workqueue_struct *rbd_wq;
 482
 483static struct ceph_snap_context rbd_empty_snapc = {
 484        .nref = REFCOUNT_INIT(1),
 485};
 486
 487/*
 488 * single-major requires >= 0.75 version of userspace rbd utility.
 489 */
 490static bool single_major = true;
 491module_param(single_major, bool, 0444);
 492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 493
 494static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
 495static ssize_t remove_store(struct bus_type *bus, const char *buf,
 496                            size_t count);
 497static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 498                                      size_t count);
 499static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 500                                         size_t count);
 501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 502
 503static int rbd_dev_id_to_minor(int dev_id)
 504{
 505        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 506}
 507
 508static int minor_to_rbd_dev_id(int minor)
 509{
 510        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 511}
 512
 513static bool rbd_is_ro(struct rbd_device *rbd_dev)
 514{
 515        return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
 516}
 517
 518static bool rbd_is_snap(struct rbd_device *rbd_dev)
 519{
 520        return rbd_dev->spec->snap_id != CEPH_NOSNAP;
 521}
 522
 523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 524{
 525        lockdep_assert_held(&rbd_dev->lock_rwsem);
 526
 527        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 528               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 529}
 530
 531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 532{
 533        bool is_lock_owner;
 534
 535        down_read(&rbd_dev->lock_rwsem);
 536        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 537        up_read(&rbd_dev->lock_rwsem);
 538        return is_lock_owner;
 539}
 540
 541static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 542{
 543        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 544}
 545
 546static BUS_ATTR_WO(add);
 547static BUS_ATTR_WO(remove);
 548static BUS_ATTR_WO(add_single_major);
 549static BUS_ATTR_WO(remove_single_major);
 550static BUS_ATTR_RO(supported_features);
 551
 552static struct attribute *rbd_bus_attrs[] = {
 553        &bus_attr_add.attr,
 554        &bus_attr_remove.attr,
 555        &bus_attr_add_single_major.attr,
 556        &bus_attr_remove_single_major.attr,
 557        &bus_attr_supported_features.attr,
 558        NULL,
 559};
 560
 561static umode_t rbd_bus_is_visible(struct kobject *kobj,
 562                                  struct attribute *attr, int index)
 563{
 564        if (!single_major &&
 565            (attr == &bus_attr_add_single_major.attr ||
 566             attr == &bus_attr_remove_single_major.attr))
 567                return 0;
 568
 569        return attr->mode;
 570}
 571
 572static const struct attribute_group rbd_bus_group = {
 573        .attrs = rbd_bus_attrs,
 574        .is_visible = rbd_bus_is_visible,
 575};
 576__ATTRIBUTE_GROUPS(rbd_bus);
 577
 578static struct bus_type rbd_bus_type = {
 579        .name           = "rbd",
 580        .bus_groups     = rbd_bus_groups,
 581};
 582
 583static void rbd_root_dev_release(struct device *dev)
 584{
 585}
 586
 587static struct device rbd_root_dev = {
 588        .init_name =    "rbd",
 589        .release =      rbd_root_dev_release,
 590};
 591
 592static __printf(2, 3)
 593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 594{
 595        struct va_format vaf;
 596        va_list args;
 597
 598        va_start(args, fmt);
 599        vaf.fmt = fmt;
 600        vaf.va = &args;
 601
 602        if (!rbd_dev)
 603                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 604        else if (rbd_dev->disk)
 605                printk(KERN_WARNING "%s: %s: %pV\n",
 606                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 607        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 608                printk(KERN_WARNING "%s: image %s: %pV\n",
 609                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 610        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 611                printk(KERN_WARNING "%s: id %s: %pV\n",
 612                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 613        else    /* punt */
 614                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 615                        RBD_DRV_NAME, rbd_dev, &vaf);
 616        va_end(args);
 617}
 618
 619#ifdef RBD_DEBUG
 620#define rbd_assert(expr)                                                \
 621                if (unlikely(!(expr))) {                                \
 622                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 623                                                "at line %d:\n\n"       \
 624                                        "\trbd_assert(%s);\n\n",        \
 625                                        __func__, __LINE__, #expr);     \
 626                        BUG();                                          \
 627                }
 628#else /* !RBD_DEBUG */
 629#  define rbd_assert(expr)      ((void) 0)
 630#endif /* !RBD_DEBUG */
 631
 632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 633
 634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 636static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 637static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 638static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 639                                        u64 snap_id);
 640static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 641                                u8 *order, u64 *snap_size);
 642static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
 643
 644static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
 645static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
 646
 647/*
 648 * Return true if nothing else is pending.
 649 */
 650static bool pending_result_dec(struct pending_result *pending, int *result)
 651{
 652        rbd_assert(pending->num_pending > 0);
 653
 654        if (*result && !pending->result)
 655                pending->result = *result;
 656        if (--pending->num_pending)
 657                return false;
 658
 659        *result = pending->result;
 660        return true;
 661}
 662
 663static int rbd_open(struct block_device *bdev, fmode_t mode)
 664{
 665        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 666        bool removing = false;
 667
 668        spin_lock_irq(&rbd_dev->lock);
 669        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 670                removing = true;
 671        else
 672                rbd_dev->open_count++;
 673        spin_unlock_irq(&rbd_dev->lock);
 674        if (removing)
 675                return -ENOENT;
 676
 677        (void) get_device(&rbd_dev->dev);
 678
 679        return 0;
 680}
 681
 682static void rbd_release(struct gendisk *disk, fmode_t mode)
 683{
 684        struct rbd_device *rbd_dev = disk->private_data;
 685        unsigned long open_count_before;
 686
 687        spin_lock_irq(&rbd_dev->lock);
 688        open_count_before = rbd_dev->open_count--;
 689        spin_unlock_irq(&rbd_dev->lock);
 690        rbd_assert(open_count_before > 0);
 691
 692        put_device(&rbd_dev->dev);
 693}
 694
 695static int rbd_set_read_only(struct block_device *bdev, bool ro)
 696{
 697        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 698
 699        /*
 700         * Both images mapped read-only and snapshots can't be marked
 701         * read-write.
 702         */
 703        if (!ro) {
 704                if (rbd_is_ro(rbd_dev))
 705                        return -EROFS;
 706
 707                rbd_assert(!rbd_is_snap(rbd_dev));
 708        }
 709
 710        return 0;
 711}
 712
 713static const struct block_device_operations rbd_bd_ops = {
 714        .owner                  = THIS_MODULE,
 715        .open                   = rbd_open,
 716        .release                = rbd_release,
 717        .set_read_only          = rbd_set_read_only,
 718};
 719
 720/*
 721 * Initialize an rbd client instance.  Success or not, this function
 722 * consumes ceph_opts.  Caller holds client_mutex.
 723 */
 724static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 725{
 726        struct rbd_client *rbdc;
 727        int ret = -ENOMEM;
 728
 729        dout("%s:\n", __func__);
 730        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 731        if (!rbdc)
 732                goto out_opt;
 733
 734        kref_init(&rbdc->kref);
 735        INIT_LIST_HEAD(&rbdc->node);
 736
 737        rbdc->client = ceph_create_client(ceph_opts, rbdc);
 738        if (IS_ERR(rbdc->client))
 739                goto out_rbdc;
 740        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 741
 742        ret = ceph_open_session(rbdc->client);
 743        if (ret < 0)
 744                goto out_client;
 745
 746        spin_lock(&rbd_client_list_lock);
 747        list_add_tail(&rbdc->node, &rbd_client_list);
 748        spin_unlock(&rbd_client_list_lock);
 749
 750        dout("%s: rbdc %p\n", __func__, rbdc);
 751
 752        return rbdc;
 753out_client:
 754        ceph_destroy_client(rbdc->client);
 755out_rbdc:
 756        kfree(rbdc);
 757out_opt:
 758        if (ceph_opts)
 759                ceph_destroy_options(ceph_opts);
 760        dout("%s: error %d\n", __func__, ret);
 761
 762        return ERR_PTR(ret);
 763}
 764
 765static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 766{
 767        kref_get(&rbdc->kref);
 768
 769        return rbdc;
 770}
 771
 772/*
 773 * Find a ceph client with specific addr and configuration.  If
 774 * found, bump its reference count.
 775 */
 776static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 777{
 778        struct rbd_client *client_node;
 779        bool found = false;
 780
 781        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 782                return NULL;
 783
 784        spin_lock(&rbd_client_list_lock);
 785        list_for_each_entry(client_node, &rbd_client_list, node) {
 786                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 787                        __rbd_get_client(client_node);
 788
 789                        found = true;
 790                        break;
 791                }
 792        }
 793        spin_unlock(&rbd_client_list_lock);
 794
 795        return found ? client_node : NULL;
 796}
 797
 798/*
 799 * (Per device) rbd map options
 800 */
 801enum {
 802        Opt_queue_depth,
 803        Opt_alloc_size,
 804        Opt_lock_timeout,
 805        /* int args above */
 806        Opt_pool_ns,
 807        Opt_compression_hint,
 808        /* string args above */
 809        Opt_read_only,
 810        Opt_read_write,
 811        Opt_lock_on_read,
 812        Opt_exclusive,
 813        Opt_notrim,
 814};
 815
 816enum {
 817        Opt_compression_hint_none,
 818        Opt_compression_hint_compressible,
 819        Opt_compression_hint_incompressible,
 820};
 821
 822static const struct constant_table rbd_param_compression_hint[] = {
 823        {"none",                Opt_compression_hint_none},
 824        {"compressible",        Opt_compression_hint_compressible},
 825        {"incompressible",      Opt_compression_hint_incompressible},
 826        {}
 827};
 828
 829static const struct fs_parameter_spec rbd_parameters[] = {
 830        fsparam_u32     ("alloc_size",                  Opt_alloc_size),
 831        fsparam_enum    ("compression_hint",            Opt_compression_hint,
 832                         rbd_param_compression_hint),
 833        fsparam_flag    ("exclusive",                   Opt_exclusive),
 834        fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
 835        fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
 836        fsparam_flag    ("notrim",                      Opt_notrim),
 837        fsparam_string  ("_pool_ns",                    Opt_pool_ns),
 838        fsparam_u32     ("queue_depth",                 Opt_queue_depth),
 839        fsparam_flag    ("read_only",                   Opt_read_only),
 840        fsparam_flag    ("read_write",                  Opt_read_write),
 841        fsparam_flag    ("ro",                          Opt_read_only),
 842        fsparam_flag    ("rw",                          Opt_read_write),
 843        {}
 844};
 845
 846struct rbd_options {
 847        int     queue_depth;
 848        int     alloc_size;
 849        unsigned long   lock_timeout;
 850        bool    read_only;
 851        bool    lock_on_read;
 852        bool    exclusive;
 853        bool    trim;
 854
 855        u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 856};
 857
 858#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 859#define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
 860#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 861#define RBD_READ_ONLY_DEFAULT   false
 862#define RBD_LOCK_ON_READ_DEFAULT false
 863#define RBD_EXCLUSIVE_DEFAULT   false
 864#define RBD_TRIM_DEFAULT        true
 865
 866struct rbd_parse_opts_ctx {
 867        struct rbd_spec         *spec;
 868        struct ceph_options     *copts;
 869        struct rbd_options      *opts;
 870};
 871
 872static char* obj_op_name(enum obj_operation_type op_type)
 873{
 874        switch (op_type) {
 875        case OBJ_OP_READ:
 876                return "read";
 877        case OBJ_OP_WRITE:
 878                return "write";
 879        case OBJ_OP_DISCARD:
 880                return "discard";
 881        case OBJ_OP_ZEROOUT:
 882                return "zeroout";
 883        default:
 884                return "???";
 885        }
 886}
 887
 888/*
 889 * Destroy ceph client
 890 *
 891 * Caller must hold rbd_client_list_lock.
 892 */
 893static void rbd_client_release(struct kref *kref)
 894{
 895        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 896
 897        dout("%s: rbdc %p\n", __func__, rbdc);
 898        spin_lock(&rbd_client_list_lock);
 899        list_del(&rbdc->node);
 900        spin_unlock(&rbd_client_list_lock);
 901
 902        ceph_destroy_client(rbdc->client);
 903        kfree(rbdc);
 904}
 905
 906/*
 907 * Drop reference to ceph client node. If it's not referenced anymore, release
 908 * it.
 909 */
 910static void rbd_put_client(struct rbd_client *rbdc)
 911{
 912        if (rbdc)
 913                kref_put(&rbdc->kref, rbd_client_release);
 914}
 915
 916/*
 917 * Get a ceph client with specific addr and configuration, if one does
 918 * not exist create it.  Either way, ceph_opts is consumed by this
 919 * function.
 920 */
 921static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 922{
 923        struct rbd_client *rbdc;
 924        int ret;
 925
 926        mutex_lock(&client_mutex);
 927        rbdc = rbd_client_find(ceph_opts);
 928        if (rbdc) {
 929                ceph_destroy_options(ceph_opts);
 930
 931                /*
 932                 * Using an existing client.  Make sure ->pg_pools is up to
 933                 * date before we look up the pool id in do_rbd_add().
 934                 */
 935                ret = ceph_wait_for_latest_osdmap(rbdc->client,
 936                                        rbdc->client->options->mount_timeout);
 937                if (ret) {
 938                        rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
 939                        rbd_put_client(rbdc);
 940                        rbdc = ERR_PTR(ret);
 941                }
 942        } else {
 943                rbdc = rbd_client_create(ceph_opts);
 944        }
 945        mutex_unlock(&client_mutex);
 946
 947        return rbdc;
 948}
 949
 950static bool rbd_image_format_valid(u32 image_format)
 951{
 952        return image_format == 1 || image_format == 2;
 953}
 954
 955static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 956{
 957        size_t size;
 958        u32 snap_count;
 959
 960        /* The header has to start with the magic rbd header text */
 961        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 962                return false;
 963
 964        /* The bio layer requires at least sector-sized I/O */
 965
 966        if (ondisk->options.order < SECTOR_SHIFT)
 967                return false;
 968
 969        /* If we use u64 in a few spots we may be able to loosen this */
 970
 971        if (ondisk->options.order > 8 * sizeof (int) - 1)
 972                return false;
 973
 974        /*
 975         * The size of a snapshot header has to fit in a size_t, and
 976         * that limits the number of snapshots.
 977         */
 978        snap_count = le32_to_cpu(ondisk->snap_count);
 979        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 980        if (snap_count > size / sizeof (__le64))
 981                return false;
 982
 983        /*
 984         * Not only that, but the size of the entire the snapshot
 985         * header must also be representable in a size_t.
 986         */
 987        size -= snap_count * sizeof (__le64);
 988        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 989                return false;
 990
 991        return true;
 992}
 993
 994/*
 995 * returns the size of an object in the image
 996 */
 997static u32 rbd_obj_bytes(struct rbd_image_header *header)
 998{
 999        return 1U << header->obj_order;
1000}
1001
1002static void rbd_init_layout(struct rbd_device *rbd_dev)
1003{
1004        if (rbd_dev->header.stripe_unit == 0 ||
1005            rbd_dev->header.stripe_count == 0) {
1006                rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1007                rbd_dev->header.stripe_count = 1;
1008        }
1009
1010        rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1011        rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1012        rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1013        rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1014                          rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1015        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1016}
1017
1018/*
1019 * Fill an rbd image header with information from the given format 1
1020 * on-disk header.
1021 */
1022static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1023                                 struct rbd_image_header_ondisk *ondisk)
1024{
1025        struct rbd_image_header *header = &rbd_dev->header;
1026        bool first_time = header->object_prefix == NULL;
1027        struct ceph_snap_context *snapc;
1028        char *object_prefix = NULL;
1029        char *snap_names = NULL;
1030        u64 *snap_sizes = NULL;
1031        u32 snap_count;
1032        int ret = -ENOMEM;
1033        u32 i;
1034
1035        /* Allocate this now to avoid having to handle failure below */
1036
1037        if (first_time) {
1038                object_prefix = kstrndup(ondisk->object_prefix,
1039                                         sizeof(ondisk->object_prefix),
1040                                         GFP_KERNEL);
1041                if (!object_prefix)
1042                        return -ENOMEM;
1043        }
1044
1045        /* Allocate the snapshot context and fill it in */
1046
1047        snap_count = le32_to_cpu(ondisk->snap_count);
1048        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1049        if (!snapc)
1050                goto out_err;
1051        snapc->seq = le64_to_cpu(ondisk->snap_seq);
1052        if (snap_count) {
1053                struct rbd_image_snap_ondisk *snaps;
1054                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1055
1056                /* We'll keep a copy of the snapshot names... */
1057
1058                if (snap_names_len > (u64)SIZE_MAX)
1059                        goto out_2big;
1060                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1061                if (!snap_names)
1062                        goto out_err;
1063
1064                /* ...as well as the array of their sizes. */
1065                snap_sizes = kmalloc_array(snap_count,
1066                                           sizeof(*header->snap_sizes),
1067                                           GFP_KERNEL);
1068                if (!snap_sizes)
1069                        goto out_err;
1070
1071                /*
1072                 * Copy the names, and fill in each snapshot's id
1073                 * and size.
1074                 *
1075                 * Note that rbd_dev_v1_header_info() guarantees the
1076                 * ondisk buffer we're working with has
1077                 * snap_names_len bytes beyond the end of the
1078                 * snapshot id array, this memcpy() is safe.
1079                 */
1080                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1081                snaps = ondisk->snaps;
1082                for (i = 0; i < snap_count; i++) {
1083                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1084                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1085                }
1086        }
1087
1088        /* We won't fail any more, fill in the header */
1089
1090        if (first_time) {
1091                header->object_prefix = object_prefix;
1092                header->obj_order = ondisk->options.order;
1093                rbd_init_layout(rbd_dev);
1094        } else {
1095                ceph_put_snap_context(header->snapc);
1096                kfree(header->snap_names);
1097                kfree(header->snap_sizes);
1098        }
1099
1100        /* The remaining fields always get updated (when we refresh) */
1101
1102        header->image_size = le64_to_cpu(ondisk->image_size);
1103        header->snapc = snapc;
1104        header->snap_names = snap_names;
1105        header->snap_sizes = snap_sizes;
1106
1107        return 0;
1108out_2big:
1109        ret = -EIO;
1110out_err:
1111        kfree(snap_sizes);
1112        kfree(snap_names);
1113        ceph_put_snap_context(snapc);
1114        kfree(object_prefix);
1115
1116        return ret;
1117}
1118
1119static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1120{
1121        const char *snap_name;
1122
1123        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1124
1125        /* Skip over names until we find the one we are looking for */
1126
1127        snap_name = rbd_dev->header.snap_names;
1128        while (which--)
1129                snap_name += strlen(snap_name) + 1;
1130
1131        return kstrdup(snap_name, GFP_KERNEL);
1132}
1133
1134/*
1135 * Snapshot id comparison function for use with qsort()/bsearch().
1136 * Note that result is for snapshots in *descending* order.
1137 */
1138static int snapid_compare_reverse(const void *s1, const void *s2)
1139{
1140        u64 snap_id1 = *(u64 *)s1;
1141        u64 snap_id2 = *(u64 *)s2;
1142
1143        if (snap_id1 < snap_id2)
1144                return 1;
1145        return snap_id1 == snap_id2 ? 0 : -1;
1146}
1147
1148/*
1149 * Search a snapshot context to see if the given snapshot id is
1150 * present.
1151 *
1152 * Returns the position of the snapshot id in the array if it's found,
1153 * or BAD_SNAP_INDEX otherwise.
1154 *
1155 * Note: The snapshot array is in kept sorted (by the osd) in
1156 * reverse order, highest snapshot id first.
1157 */
1158static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1159{
1160        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1161        u64 *found;
1162
1163        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1164                                sizeof (snap_id), snapid_compare_reverse);
1165
1166        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1167}
1168
1169static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1170                                        u64 snap_id)
1171{
1172        u32 which;
1173        const char *snap_name;
1174
1175        which = rbd_dev_snap_index(rbd_dev, snap_id);
1176        if (which == BAD_SNAP_INDEX)
1177                return ERR_PTR(-ENOENT);
1178
1179        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1180        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1181}
1182
1183static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1184{
1185        if (snap_id == CEPH_NOSNAP)
1186                return RBD_SNAP_HEAD_NAME;
1187
1188        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1189        if (rbd_dev->image_format == 1)
1190                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1191
1192        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1193}
1194
1195static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1196                                u64 *snap_size)
1197{
1198        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1199        if (snap_id == CEPH_NOSNAP) {
1200                *snap_size = rbd_dev->header.image_size;
1201        } else if (rbd_dev->image_format == 1) {
1202                u32 which;
1203
1204                which = rbd_dev_snap_index(rbd_dev, snap_id);
1205                if (which == BAD_SNAP_INDEX)
1206                        return -ENOENT;
1207
1208                *snap_size = rbd_dev->header.snap_sizes[which];
1209        } else {
1210                u64 size = 0;
1211                int ret;
1212
1213                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1214                if (ret)
1215                        return ret;
1216
1217                *snap_size = size;
1218        }
1219        return 0;
1220}
1221
1222static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1223{
1224        u64 snap_id = rbd_dev->spec->snap_id;
1225        u64 size = 0;
1226        int ret;
1227
1228        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1229        if (ret)
1230                return ret;
1231
1232        rbd_dev->mapping.size = size;
1233        return 0;
1234}
1235
1236static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1237{
1238        rbd_dev->mapping.size = 0;
1239}
1240
1241static void zero_bvec(struct bio_vec *bv)
1242{
1243        void *buf;
1244        unsigned long flags;
1245
1246        buf = bvec_kmap_irq(bv, &flags);
1247        memset(buf, 0, bv->bv_len);
1248        flush_dcache_page(bv->bv_page);
1249        bvec_kunmap_irq(buf, &flags);
1250}
1251
1252static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1253{
1254        struct ceph_bio_iter it = *bio_pos;
1255
1256        ceph_bio_iter_advance(&it, off);
1257        ceph_bio_iter_advance_step(&it, bytes, ({
1258                zero_bvec(&bv);
1259        }));
1260}
1261
1262static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1263{
1264        struct ceph_bvec_iter it = *bvec_pos;
1265
1266        ceph_bvec_iter_advance(&it, off);
1267        ceph_bvec_iter_advance_step(&it, bytes, ({
1268                zero_bvec(&bv);
1269        }));
1270}
1271
1272/*
1273 * Zero a range in @obj_req data buffer defined by a bio (list) or
1274 * (private) bio_vec array.
1275 *
1276 * @off is relative to the start of the data buffer.
1277 */
1278static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1279                               u32 bytes)
1280{
1281        dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1282
1283        switch (obj_req->img_request->data_type) {
1284        case OBJ_REQUEST_BIO:
1285                zero_bios(&obj_req->bio_pos, off, bytes);
1286                break;
1287        case OBJ_REQUEST_BVECS:
1288        case OBJ_REQUEST_OWN_BVECS:
1289                zero_bvecs(&obj_req->bvec_pos, off, bytes);
1290                break;
1291        default:
1292                BUG();
1293        }
1294}
1295
1296static void rbd_obj_request_destroy(struct kref *kref);
1297static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1298{
1299        rbd_assert(obj_request != NULL);
1300        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1301                kref_read(&obj_request->kref));
1302        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1303}
1304
1305static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1306                                        struct rbd_obj_request *obj_request)
1307{
1308        rbd_assert(obj_request->img_request == NULL);
1309
1310        /* Image request now owns object's original reference */
1311        obj_request->img_request = img_request;
1312        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1313}
1314
1315static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1316                                        struct rbd_obj_request *obj_request)
1317{
1318        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1319        list_del(&obj_request->ex.oe_item);
1320        rbd_assert(obj_request->img_request == img_request);
1321        rbd_obj_request_put(obj_request);
1322}
1323
1324static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1325{
1326        struct rbd_obj_request *obj_req = osd_req->r_priv;
1327
1328        dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1329             __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1330             obj_req->ex.oe_off, obj_req->ex.oe_len);
1331        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1332}
1333
1334/*
1335 * The default/initial value for all image request flags is 0.  Each
1336 * is conditionally set to 1 at image request initialization time
1337 * and currently never change thereafter.
1338 */
1339static void img_request_layered_set(struct rbd_img_request *img_request)
1340{
1341        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1342}
1343
1344static bool img_request_layered_test(struct rbd_img_request *img_request)
1345{
1346        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1347}
1348
1349static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1350{
1351        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1352
1353        return !obj_req->ex.oe_off &&
1354               obj_req->ex.oe_len == rbd_dev->layout.object_size;
1355}
1356
1357static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1358{
1359        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1360
1361        return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1362                                        rbd_dev->layout.object_size;
1363}
1364
1365/*
1366 * Must be called after rbd_obj_calc_img_extents().
1367 */
1368static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1369{
1370        if (!obj_req->num_img_extents ||
1371            (rbd_obj_is_entire(obj_req) &&
1372             !obj_req->img_request->snapc->num_snaps))
1373                return false;
1374
1375        return true;
1376}
1377
1378static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1379{
1380        return ceph_file_extents_bytes(obj_req->img_extents,
1381                                       obj_req->num_img_extents);
1382}
1383
1384static bool rbd_img_is_write(struct rbd_img_request *img_req)
1385{
1386        switch (img_req->op_type) {
1387        case OBJ_OP_READ:
1388                return false;
1389        case OBJ_OP_WRITE:
1390        case OBJ_OP_DISCARD:
1391        case OBJ_OP_ZEROOUT:
1392                return true;
1393        default:
1394                BUG();
1395        }
1396}
1397
1398static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1399{
1400        struct rbd_obj_request *obj_req = osd_req->r_priv;
1401        int result;
1402
1403        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1404             osd_req->r_result, obj_req);
1405
1406        /*
1407         * Writes aren't allowed to return a data payload.  In some
1408         * guarded write cases (e.g. stat + zero on an empty object)
1409         * a stat response makes it through, but we don't care.
1410         */
1411        if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1412                result = 0;
1413        else
1414                result = osd_req->r_result;
1415
1416        rbd_obj_handle_request(obj_req, result);
1417}
1418
1419static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1420{
1421        struct rbd_obj_request *obj_request = osd_req->r_priv;
1422        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1423        struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1424
1425        osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1426        osd_req->r_snapid = obj_request->img_request->snap_id;
1427}
1428
1429static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1430{
1431        struct rbd_obj_request *obj_request = osd_req->r_priv;
1432
1433        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1434        ktime_get_real_ts64(&osd_req->r_mtime);
1435        osd_req->r_data_offset = obj_request->ex.oe_off;
1436}
1437
1438static struct ceph_osd_request *
1439__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1440                          struct ceph_snap_context *snapc, int num_ops)
1441{
1442        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1443        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1444        struct ceph_osd_request *req;
1445        const char *name_format = rbd_dev->image_format == 1 ?
1446                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1447        int ret;
1448
1449        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1450        if (!req)
1451                return ERR_PTR(-ENOMEM);
1452
1453        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1454        req->r_callback = rbd_osd_req_callback;
1455        req->r_priv = obj_req;
1456
1457        /*
1458         * Data objects may be stored in a separate pool, but always in
1459         * the same namespace in that pool as the header in its pool.
1460         */
1461        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1462        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1463
1464        ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1465                               rbd_dev->header.object_prefix,
1466                               obj_req->ex.oe_objno);
1467        if (ret)
1468                return ERR_PTR(ret);
1469
1470        return req;
1471}
1472
1473static struct ceph_osd_request *
1474rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1475{
1476        return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1477                                         num_ops);
1478}
1479
1480static struct rbd_obj_request *rbd_obj_request_create(void)
1481{
1482        struct rbd_obj_request *obj_request;
1483
1484        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1485        if (!obj_request)
1486                return NULL;
1487
1488        ceph_object_extent_init(&obj_request->ex);
1489        INIT_LIST_HEAD(&obj_request->osd_reqs);
1490        mutex_init(&obj_request->state_mutex);
1491        kref_init(&obj_request->kref);
1492
1493        dout("%s %p\n", __func__, obj_request);
1494        return obj_request;
1495}
1496
1497static void rbd_obj_request_destroy(struct kref *kref)
1498{
1499        struct rbd_obj_request *obj_request;
1500        struct ceph_osd_request *osd_req;
1501        u32 i;
1502
1503        obj_request = container_of(kref, struct rbd_obj_request, kref);
1504
1505        dout("%s: obj %p\n", __func__, obj_request);
1506
1507        while (!list_empty(&obj_request->osd_reqs)) {
1508                osd_req = list_first_entry(&obj_request->osd_reqs,
1509                                    struct ceph_osd_request, r_private_item);
1510                list_del_init(&osd_req->r_private_item);
1511                ceph_osdc_put_request(osd_req);
1512        }
1513
1514        switch (obj_request->img_request->data_type) {
1515        case OBJ_REQUEST_NODATA:
1516        case OBJ_REQUEST_BIO:
1517        case OBJ_REQUEST_BVECS:
1518                break;          /* Nothing to do */
1519        case OBJ_REQUEST_OWN_BVECS:
1520                kfree(obj_request->bvec_pos.bvecs);
1521                break;
1522        default:
1523                BUG();
1524        }
1525
1526        kfree(obj_request->img_extents);
1527        if (obj_request->copyup_bvecs) {
1528                for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1529                        if (obj_request->copyup_bvecs[i].bv_page)
1530                                __free_page(obj_request->copyup_bvecs[i].bv_page);
1531                }
1532                kfree(obj_request->copyup_bvecs);
1533        }
1534
1535        kmem_cache_free(rbd_obj_request_cache, obj_request);
1536}
1537
1538/* It's OK to call this for a device with no parent */
1539
1540static void rbd_spec_put(struct rbd_spec *spec);
1541static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1542{
1543        rbd_dev_remove_parent(rbd_dev);
1544        rbd_spec_put(rbd_dev->parent_spec);
1545        rbd_dev->parent_spec = NULL;
1546        rbd_dev->parent_overlap = 0;
1547}
1548
1549/*
1550 * Parent image reference counting is used to determine when an
1551 * image's parent fields can be safely torn down--after there are no
1552 * more in-flight requests to the parent image.  When the last
1553 * reference is dropped, cleaning them up is safe.
1554 */
1555static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1556{
1557        int counter;
1558
1559        if (!rbd_dev->parent_spec)
1560                return;
1561
1562        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1563        if (counter > 0)
1564                return;
1565
1566        /* Last reference; clean up parent data structures */
1567
1568        if (!counter)
1569                rbd_dev_unparent(rbd_dev);
1570        else
1571                rbd_warn(rbd_dev, "parent reference underflow");
1572}
1573
1574/*
1575 * If an image has a non-zero parent overlap, get a reference to its
1576 * parent.
1577 *
1578 * Returns true if the rbd device has a parent with a non-zero
1579 * overlap and a reference for it was successfully taken, or
1580 * false otherwise.
1581 */
1582static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1583{
1584        int counter = 0;
1585
1586        if (!rbd_dev->parent_spec)
1587                return false;
1588
1589        if (rbd_dev->parent_overlap)
1590                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1591
1592        if (counter < 0)
1593                rbd_warn(rbd_dev, "parent reference overflow");
1594
1595        return counter > 0;
1596}
1597
1598static void rbd_img_request_init(struct rbd_img_request *img_request,
1599                                 struct rbd_device *rbd_dev,
1600                                 enum obj_operation_type op_type)
1601{
1602        memset(img_request, 0, sizeof(*img_request));
1603
1604        img_request->rbd_dev = rbd_dev;
1605        img_request->op_type = op_type;
1606
1607        INIT_LIST_HEAD(&img_request->lock_item);
1608        INIT_LIST_HEAD(&img_request->object_extents);
1609        mutex_init(&img_request->state_mutex);
1610}
1611
1612static void rbd_img_capture_header(struct rbd_img_request *img_req)
1613{
1614        struct rbd_device *rbd_dev = img_req->rbd_dev;
1615
1616        lockdep_assert_held(&rbd_dev->header_rwsem);
1617
1618        if (rbd_img_is_write(img_req))
1619                img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1620        else
1621                img_req->snap_id = rbd_dev->spec->snap_id;
1622
1623        if (rbd_dev_parent_get(rbd_dev))
1624                img_request_layered_set(img_req);
1625}
1626
1627static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1628{
1629        struct rbd_obj_request *obj_request;
1630        struct rbd_obj_request *next_obj_request;
1631
1632        dout("%s: img %p\n", __func__, img_request);
1633
1634        WARN_ON(!list_empty(&img_request->lock_item));
1635        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1636                rbd_img_obj_request_del(img_request, obj_request);
1637
1638        if (img_request_layered_test(img_request))
1639                rbd_dev_parent_put(img_request->rbd_dev);
1640
1641        if (rbd_img_is_write(img_request))
1642                ceph_put_snap_context(img_request->snapc);
1643
1644        if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1645                kmem_cache_free(rbd_img_request_cache, img_request);
1646}
1647
1648#define BITS_PER_OBJ    2
1649#define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1650#define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1651
1652static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1653                                   u64 *index, u8 *shift)
1654{
1655        u32 off;
1656
1657        rbd_assert(objno < rbd_dev->object_map_size);
1658        *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1659        *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1660}
1661
1662static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1663{
1664        u64 index;
1665        u8 shift;
1666
1667        lockdep_assert_held(&rbd_dev->object_map_lock);
1668        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1669        return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1670}
1671
1672static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1673{
1674        u64 index;
1675        u8 shift;
1676        u8 *p;
1677
1678        lockdep_assert_held(&rbd_dev->object_map_lock);
1679        rbd_assert(!(val & ~OBJ_MASK));
1680
1681        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1682        p = &rbd_dev->object_map[index];
1683        *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1684}
1685
1686static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1687{
1688        u8 state;
1689
1690        spin_lock(&rbd_dev->object_map_lock);
1691        state = __rbd_object_map_get(rbd_dev, objno);
1692        spin_unlock(&rbd_dev->object_map_lock);
1693        return state;
1694}
1695
1696static bool use_object_map(struct rbd_device *rbd_dev)
1697{
1698        /*
1699         * An image mapped read-only can't use the object map -- it isn't
1700         * loaded because the header lock isn't acquired.  Someone else can
1701         * write to the image and update the object map behind our back.
1702         *
1703         * A snapshot can't be written to, so using the object map is always
1704         * safe.
1705         */
1706        if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1707                return false;
1708
1709        return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1710                !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1711}
1712
1713static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1714{
1715        u8 state;
1716
1717        /* fall back to default logic if object map is disabled or invalid */
1718        if (!use_object_map(rbd_dev))
1719                return true;
1720
1721        state = rbd_object_map_get(rbd_dev, objno);
1722        return state != OBJECT_NONEXISTENT;
1723}
1724
1725static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1726                                struct ceph_object_id *oid)
1727{
1728        if (snap_id == CEPH_NOSNAP)
1729                ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1730                                rbd_dev->spec->image_id);
1731        else
1732                ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1733                                rbd_dev->spec->image_id, snap_id);
1734}
1735
1736static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1737{
1738        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1739        CEPH_DEFINE_OID_ONSTACK(oid);
1740        u8 lock_type;
1741        char *lock_tag;
1742        struct ceph_locker *lockers;
1743        u32 num_lockers;
1744        bool broke_lock = false;
1745        int ret;
1746
1747        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1748
1749again:
1750        ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1751                            CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1752        if (ret != -EBUSY || broke_lock) {
1753                if (ret == -EEXIST)
1754                        ret = 0; /* already locked by myself */
1755                if (ret)
1756                        rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1757                return ret;
1758        }
1759
1760        ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1761                                 RBD_LOCK_NAME, &lock_type, &lock_tag,
1762                                 &lockers, &num_lockers);
1763        if (ret) {
1764                if (ret == -ENOENT)
1765                        goto again;
1766
1767                rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1768                return ret;
1769        }
1770
1771        kfree(lock_tag);
1772        if (num_lockers == 0)
1773                goto again;
1774
1775        rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1776                 ENTITY_NAME(lockers[0].id.name));
1777
1778        ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1779                                  RBD_LOCK_NAME, lockers[0].id.cookie,
1780                                  &lockers[0].id.name);
1781        ceph_free_lockers(lockers, num_lockers);
1782        if (ret) {
1783                if (ret == -ENOENT)
1784                        goto again;
1785
1786                rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1787                return ret;
1788        }
1789
1790        broke_lock = true;
1791        goto again;
1792}
1793
1794static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1795{
1796        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1797        CEPH_DEFINE_OID_ONSTACK(oid);
1798        int ret;
1799
1800        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1801
1802        ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1803                              "");
1804        if (ret && ret != -ENOENT)
1805                rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1806}
1807
1808static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1809{
1810        u8 struct_v;
1811        u32 struct_len;
1812        u32 header_len;
1813        void *header_end;
1814        int ret;
1815
1816        ceph_decode_32_safe(p, end, header_len, e_inval);
1817        header_end = *p + header_len;
1818
1819        ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1820                                  &struct_len);
1821        if (ret)
1822                return ret;
1823
1824        ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1825
1826        *p = header_end;
1827        return 0;
1828
1829e_inval:
1830        return -EINVAL;
1831}
1832
1833static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1834{
1835        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1836        CEPH_DEFINE_OID_ONSTACK(oid);
1837        struct page **pages;
1838        void *p, *end;
1839        size_t reply_len;
1840        u64 num_objects;
1841        u64 object_map_bytes;
1842        u64 object_map_size;
1843        int num_pages;
1844        int ret;
1845
1846        rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1847
1848        num_objects = ceph_get_num_objects(&rbd_dev->layout,
1849                                           rbd_dev->mapping.size);
1850        object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1851                                            BITS_PER_BYTE);
1852        num_pages = calc_pages_for(0, object_map_bytes) + 1;
1853        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1854        if (IS_ERR(pages))
1855                return PTR_ERR(pages);
1856
1857        reply_len = num_pages * PAGE_SIZE;
1858        rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1859        ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1860                             "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1861                             NULL, 0, pages, &reply_len);
1862        if (ret)
1863                goto out;
1864
1865        p = page_address(pages[0]);
1866        end = p + min(reply_len, (size_t)PAGE_SIZE);
1867        ret = decode_object_map_header(&p, end, &object_map_size);
1868        if (ret)
1869                goto out;
1870
1871        if (object_map_size != num_objects) {
1872                rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1873                         object_map_size, num_objects);
1874                ret = -EINVAL;
1875                goto out;
1876        }
1877
1878        if (offset_in_page(p) + object_map_bytes > reply_len) {
1879                ret = -EINVAL;
1880                goto out;
1881        }
1882
1883        rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1884        if (!rbd_dev->object_map) {
1885                ret = -ENOMEM;
1886                goto out;
1887        }
1888
1889        rbd_dev->object_map_size = object_map_size;
1890        ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1891                                   offset_in_page(p), object_map_bytes);
1892
1893out:
1894        ceph_release_page_vector(pages, num_pages);
1895        return ret;
1896}
1897
1898static void rbd_object_map_free(struct rbd_device *rbd_dev)
1899{
1900        kvfree(rbd_dev->object_map);
1901        rbd_dev->object_map = NULL;
1902        rbd_dev->object_map_size = 0;
1903}
1904
1905static int rbd_object_map_load(struct rbd_device *rbd_dev)
1906{
1907        int ret;
1908
1909        ret = __rbd_object_map_load(rbd_dev);
1910        if (ret)
1911                return ret;
1912
1913        ret = rbd_dev_v2_get_flags(rbd_dev);
1914        if (ret) {
1915                rbd_object_map_free(rbd_dev);
1916                return ret;
1917        }
1918
1919        if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1920                rbd_warn(rbd_dev, "object map is invalid");
1921
1922        return 0;
1923}
1924
1925static int rbd_object_map_open(struct rbd_device *rbd_dev)
1926{
1927        int ret;
1928
1929        ret = rbd_object_map_lock(rbd_dev);
1930        if (ret)
1931                return ret;
1932
1933        ret = rbd_object_map_load(rbd_dev);
1934        if (ret) {
1935                rbd_object_map_unlock(rbd_dev);
1936                return ret;
1937        }
1938
1939        return 0;
1940}
1941
1942static void rbd_object_map_close(struct rbd_device *rbd_dev)
1943{
1944        rbd_object_map_free(rbd_dev);
1945        rbd_object_map_unlock(rbd_dev);
1946}
1947
1948/*
1949 * This function needs snap_id (or more precisely just something to
1950 * distinguish between HEAD and snapshot object maps), new_state and
1951 * current_state that were passed to rbd_object_map_update().
1952 *
1953 * To avoid allocating and stashing a context we piggyback on the OSD
1954 * request.  A HEAD update has two ops (assert_locked).  For new_state
1955 * and current_state we decode our own object_map_update op, encoded in
1956 * rbd_cls_object_map_update().
1957 */
1958static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1959                                        struct ceph_osd_request *osd_req)
1960{
1961        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1962        struct ceph_osd_data *osd_data;
1963        u64 objno;
1964        u8 state, new_state, current_state;
1965        bool has_current_state;
1966        void *p;
1967
1968        if (osd_req->r_result)
1969                return osd_req->r_result;
1970
1971        /*
1972         * Nothing to do for a snapshot object map.
1973         */
1974        if (osd_req->r_num_ops == 1)
1975                return 0;
1976
1977        /*
1978         * Update in-memory HEAD object map.
1979         */
1980        rbd_assert(osd_req->r_num_ops == 2);
1981        osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1982        rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1983
1984        p = page_address(osd_data->pages[0]);
1985        objno = ceph_decode_64(&p);
1986        rbd_assert(objno == obj_req->ex.oe_objno);
1987        rbd_assert(ceph_decode_64(&p) == objno + 1);
1988        new_state = ceph_decode_8(&p);
1989        has_current_state = ceph_decode_8(&p);
1990        if (has_current_state)
1991                current_state = ceph_decode_8(&p);
1992
1993        spin_lock(&rbd_dev->object_map_lock);
1994        state = __rbd_object_map_get(rbd_dev, objno);
1995        if (!has_current_state || current_state == state ||
1996            (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1997                __rbd_object_map_set(rbd_dev, objno, new_state);
1998        spin_unlock(&rbd_dev->object_map_lock);
1999
2000        return 0;
2001}
2002
2003static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2004{
2005        struct rbd_obj_request *obj_req = osd_req->r_priv;
2006        int result;
2007
2008        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2009             osd_req->r_result, obj_req);
2010
2011        result = rbd_object_map_update_finish(obj_req, osd_req);
2012        rbd_obj_handle_request(obj_req, result);
2013}
2014
2015static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2016{
2017        u8 state = rbd_object_map_get(rbd_dev, objno);
2018
2019        if (state == new_state ||
2020            (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2021            (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2022                return false;
2023
2024        return true;
2025}
2026
2027static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2028                                     int which, u64 objno, u8 new_state,
2029                                     const u8 *current_state)
2030{
2031        struct page **pages;
2032        void *p, *start;
2033        int ret;
2034
2035        ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2036        if (ret)
2037                return ret;
2038
2039        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2040        if (IS_ERR(pages))
2041                return PTR_ERR(pages);
2042
2043        p = start = page_address(pages[0]);
2044        ceph_encode_64(&p, objno);
2045        ceph_encode_64(&p, objno + 1);
2046        ceph_encode_8(&p, new_state);
2047        if (current_state) {
2048                ceph_encode_8(&p, 1);
2049                ceph_encode_8(&p, *current_state);
2050        } else {
2051                ceph_encode_8(&p, 0);
2052        }
2053
2054        osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2055                                          false, true);
2056        return 0;
2057}
2058
2059/*
2060 * Return:
2061 *   0 - object map update sent
2062 *   1 - object map update isn't needed
2063 *  <0 - error
2064 */
2065static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2066                                 u8 new_state, const u8 *current_state)
2067{
2068        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2069        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2070        struct ceph_osd_request *req;
2071        int num_ops = 1;
2072        int which = 0;
2073        int ret;
2074
2075        if (snap_id == CEPH_NOSNAP) {
2076                if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2077                        return 1;
2078
2079                num_ops++; /* assert_locked */
2080        }
2081
2082        req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2083        if (!req)
2084                return -ENOMEM;
2085
2086        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2087        req->r_callback = rbd_object_map_callback;
2088        req->r_priv = obj_req;
2089
2090        rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2091        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2092        req->r_flags = CEPH_OSD_FLAG_WRITE;
2093        ktime_get_real_ts64(&req->r_mtime);
2094
2095        if (snap_id == CEPH_NOSNAP) {
2096                /*
2097                 * Protect against possible race conditions during lock
2098                 * ownership transitions.
2099                 */
2100                ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2101                                             CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2102                if (ret)
2103                        return ret;
2104        }
2105
2106        ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2107                                        new_state, current_state);
2108        if (ret)
2109                return ret;
2110
2111        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2112        if (ret)
2113                return ret;
2114
2115        ceph_osdc_start_request(osdc, req, false);
2116        return 0;
2117}
2118
2119static void prune_extents(struct ceph_file_extent *img_extents,
2120                          u32 *num_img_extents, u64 overlap)
2121{
2122        u32 cnt = *num_img_extents;
2123
2124        /* drop extents completely beyond the overlap */
2125        while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2126                cnt--;
2127
2128        if (cnt) {
2129                struct ceph_file_extent *ex = &img_extents[cnt - 1];
2130
2131                /* trim final overlapping extent */
2132                if (ex->fe_off + ex->fe_len > overlap)
2133                        ex->fe_len = overlap - ex->fe_off;
2134        }
2135
2136        *num_img_extents = cnt;
2137}
2138
2139/*
2140 * Determine the byte range(s) covered by either just the object extent
2141 * or the entire object in the parent image.
2142 */
2143static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2144                                    bool entire)
2145{
2146        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2147        int ret;
2148
2149        if (!rbd_dev->parent_overlap)
2150                return 0;
2151
2152        ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2153                                  entire ? 0 : obj_req->ex.oe_off,
2154                                  entire ? rbd_dev->layout.object_size :
2155                                                        obj_req->ex.oe_len,
2156                                  &obj_req->img_extents,
2157                                  &obj_req->num_img_extents);
2158        if (ret)
2159                return ret;
2160
2161        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2162                      rbd_dev->parent_overlap);
2163        return 0;
2164}
2165
2166static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2167{
2168        struct rbd_obj_request *obj_req = osd_req->r_priv;
2169
2170        switch (obj_req->img_request->data_type) {
2171        case OBJ_REQUEST_BIO:
2172                osd_req_op_extent_osd_data_bio(osd_req, which,
2173                                               &obj_req->bio_pos,
2174                                               obj_req->ex.oe_len);
2175                break;
2176        case OBJ_REQUEST_BVECS:
2177        case OBJ_REQUEST_OWN_BVECS:
2178                rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2179                                                        obj_req->ex.oe_len);
2180                rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2181                osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2182                                                    &obj_req->bvec_pos);
2183                break;
2184        default:
2185                BUG();
2186        }
2187}
2188
2189static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2190{
2191        struct page **pages;
2192
2193        /*
2194         * The response data for a STAT call consists of:
2195         *     le64 length;
2196         *     struct {
2197         *         le32 tv_sec;
2198         *         le32 tv_nsec;
2199         *     } mtime;
2200         */
2201        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2202        if (IS_ERR(pages))
2203                return PTR_ERR(pages);
2204
2205        osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2206        osd_req_op_raw_data_in_pages(osd_req, which, pages,
2207                                     8 + sizeof(struct ceph_timespec),
2208                                     0, false, true);
2209        return 0;
2210}
2211
2212static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2213                                u32 bytes)
2214{
2215        struct rbd_obj_request *obj_req = osd_req->r_priv;
2216        int ret;
2217
2218        ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2219        if (ret)
2220                return ret;
2221
2222        osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2223                                          obj_req->copyup_bvec_count, bytes);
2224        return 0;
2225}
2226
2227static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2228{
2229        obj_req->read_state = RBD_OBJ_READ_START;
2230        return 0;
2231}
2232
2233static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2234                                      int which)
2235{
2236        struct rbd_obj_request *obj_req = osd_req->r_priv;
2237        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2238        u16 opcode;
2239
2240        if (!use_object_map(rbd_dev) ||
2241            !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2242                osd_req_op_alloc_hint_init(osd_req, which++,
2243                                           rbd_dev->layout.object_size,
2244                                           rbd_dev->layout.object_size,
2245                                           rbd_dev->opts->alloc_hint_flags);
2246        }
2247
2248        if (rbd_obj_is_entire(obj_req))
2249                opcode = CEPH_OSD_OP_WRITEFULL;
2250        else
2251                opcode = CEPH_OSD_OP_WRITE;
2252
2253        osd_req_op_extent_init(osd_req, which, opcode,
2254                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2255        rbd_osd_setup_data(osd_req, which);
2256}
2257
2258static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2259{
2260        int ret;
2261
2262        /* reverse map the entire object onto the parent */
2263        ret = rbd_obj_calc_img_extents(obj_req, true);
2264        if (ret)
2265                return ret;
2266
2267        if (rbd_obj_copyup_enabled(obj_req))
2268                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2269
2270        obj_req->write_state = RBD_OBJ_WRITE_START;
2271        return 0;
2272}
2273
2274static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2275{
2276        return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2277                                          CEPH_OSD_OP_ZERO;
2278}
2279
2280static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2281                                        int which)
2282{
2283        struct rbd_obj_request *obj_req = osd_req->r_priv;
2284
2285        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2286                rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2287                osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2288        } else {
2289                osd_req_op_extent_init(osd_req, which,
2290                                       truncate_or_zero_opcode(obj_req),
2291                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2292                                       0, 0);
2293        }
2294}
2295
2296static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2297{
2298        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2299        u64 off, next_off;
2300        int ret;
2301
2302        /*
2303         * Align the range to alloc_size boundary and punt on discards
2304         * that are too small to free up any space.
2305         *
2306         * alloc_size == object_size && is_tail() is a special case for
2307         * filestore with filestore_punch_hole = false, needed to allow
2308         * truncate (in addition to delete).
2309         */
2310        if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2311            !rbd_obj_is_tail(obj_req)) {
2312                off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2313                next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2314                                      rbd_dev->opts->alloc_size);
2315                if (off >= next_off)
2316                        return 1;
2317
2318                dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2319                     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2320                     off, next_off - off);
2321                obj_req->ex.oe_off = off;
2322                obj_req->ex.oe_len = next_off - off;
2323        }
2324
2325        /* reverse map the entire object onto the parent */
2326        ret = rbd_obj_calc_img_extents(obj_req, true);
2327        if (ret)
2328                return ret;
2329
2330        obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2331        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2332                obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2333
2334        obj_req->write_state = RBD_OBJ_WRITE_START;
2335        return 0;
2336}
2337
2338static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2339                                        int which)
2340{
2341        struct rbd_obj_request *obj_req = osd_req->r_priv;
2342        u16 opcode;
2343
2344        if (rbd_obj_is_entire(obj_req)) {
2345                if (obj_req->num_img_extents) {
2346                        if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2347                                osd_req_op_init(osd_req, which++,
2348                                                CEPH_OSD_OP_CREATE, 0);
2349                        opcode = CEPH_OSD_OP_TRUNCATE;
2350                } else {
2351                        rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2352                        osd_req_op_init(osd_req, which++,
2353                                        CEPH_OSD_OP_DELETE, 0);
2354                        opcode = 0;
2355                }
2356        } else {
2357                opcode = truncate_or_zero_opcode(obj_req);
2358        }
2359
2360        if (opcode)
2361                osd_req_op_extent_init(osd_req, which, opcode,
2362                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2363                                       0, 0);
2364}
2365
2366static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2367{
2368        int ret;
2369
2370        /* reverse map the entire object onto the parent */
2371        ret = rbd_obj_calc_img_extents(obj_req, true);
2372        if (ret)
2373                return ret;
2374
2375        if (rbd_obj_copyup_enabled(obj_req))
2376                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2377        if (!obj_req->num_img_extents) {
2378                obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2379                if (rbd_obj_is_entire(obj_req))
2380                        obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2381        }
2382
2383        obj_req->write_state = RBD_OBJ_WRITE_START;
2384        return 0;
2385}
2386
2387static int count_write_ops(struct rbd_obj_request *obj_req)
2388{
2389        struct rbd_img_request *img_req = obj_req->img_request;
2390
2391        switch (img_req->op_type) {
2392        case OBJ_OP_WRITE:
2393                if (!use_object_map(img_req->rbd_dev) ||
2394                    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2395                        return 2; /* setallochint + write/writefull */
2396
2397                return 1; /* write/writefull */
2398        case OBJ_OP_DISCARD:
2399                return 1; /* delete/truncate/zero */
2400        case OBJ_OP_ZEROOUT:
2401                if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2402                    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2403                        return 2; /* create + truncate */
2404
2405                return 1; /* delete/truncate/zero */
2406        default:
2407                BUG();
2408        }
2409}
2410
2411static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2412                                    int which)
2413{
2414        struct rbd_obj_request *obj_req = osd_req->r_priv;
2415
2416        switch (obj_req->img_request->op_type) {
2417        case OBJ_OP_WRITE:
2418                __rbd_osd_setup_write_ops(osd_req, which);
2419                break;
2420        case OBJ_OP_DISCARD:
2421                __rbd_osd_setup_discard_ops(osd_req, which);
2422                break;
2423        case OBJ_OP_ZEROOUT:
2424                __rbd_osd_setup_zeroout_ops(osd_req, which);
2425                break;
2426        default:
2427                BUG();
2428        }
2429}
2430
2431/*
2432 * Prune the list of object requests (adjust offset and/or length, drop
2433 * redundant requests).  Prepare object request state machines and image
2434 * request state machine for execution.
2435 */
2436static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2437{
2438        struct rbd_obj_request *obj_req, *next_obj_req;
2439        int ret;
2440
2441        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2442                switch (img_req->op_type) {
2443                case OBJ_OP_READ:
2444                        ret = rbd_obj_init_read(obj_req);
2445                        break;
2446                case OBJ_OP_WRITE:
2447                        ret = rbd_obj_init_write(obj_req);
2448                        break;
2449                case OBJ_OP_DISCARD:
2450                        ret = rbd_obj_init_discard(obj_req);
2451                        break;
2452                case OBJ_OP_ZEROOUT:
2453                        ret = rbd_obj_init_zeroout(obj_req);
2454                        break;
2455                default:
2456                        BUG();
2457                }
2458                if (ret < 0)
2459                        return ret;
2460                if (ret > 0) {
2461                        rbd_img_obj_request_del(img_req, obj_req);
2462                        continue;
2463                }
2464        }
2465
2466        img_req->state = RBD_IMG_START;
2467        return 0;
2468}
2469
2470union rbd_img_fill_iter {
2471        struct ceph_bio_iter    bio_iter;
2472        struct ceph_bvec_iter   bvec_iter;
2473};
2474
2475struct rbd_img_fill_ctx {
2476        enum obj_request_type   pos_type;
2477        union rbd_img_fill_iter *pos;
2478        union rbd_img_fill_iter iter;
2479        ceph_object_extent_fn_t set_pos_fn;
2480        ceph_object_extent_fn_t count_fn;
2481        ceph_object_extent_fn_t copy_fn;
2482};
2483
2484static struct ceph_object_extent *alloc_object_extent(void *arg)
2485{
2486        struct rbd_img_request *img_req = arg;
2487        struct rbd_obj_request *obj_req;
2488
2489        obj_req = rbd_obj_request_create();
2490        if (!obj_req)
2491                return NULL;
2492
2493        rbd_img_obj_request_add(img_req, obj_req);
2494        return &obj_req->ex;
2495}
2496
2497/*
2498 * While su != os && sc == 1 is technically not fancy (it's the same
2499 * layout as su == os && sc == 1), we can't use the nocopy path for it
2500 * because ->set_pos_fn() should be called only once per object.
2501 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2502 * treat su != os && sc == 1 as fancy.
2503 */
2504static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2505{
2506        return l->stripe_unit != l->object_size;
2507}
2508
2509static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2510                                       struct ceph_file_extent *img_extents,
2511                                       u32 num_img_extents,
2512                                       struct rbd_img_fill_ctx *fctx)
2513{
2514        u32 i;
2515        int ret;
2516
2517        img_req->data_type = fctx->pos_type;
2518
2519        /*
2520         * Create object requests and set each object request's starting
2521         * position in the provided bio (list) or bio_vec array.
2522         */
2523        fctx->iter = *fctx->pos;
2524        for (i = 0; i < num_img_extents; i++) {
2525                ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2526                                           img_extents[i].fe_off,
2527                                           img_extents[i].fe_len,
2528                                           &img_req->object_extents,
2529                                           alloc_object_extent, img_req,
2530                                           fctx->set_pos_fn, &fctx->iter);
2531                if (ret)
2532                        return ret;
2533        }
2534
2535        return __rbd_img_fill_request(img_req);
2536}
2537
2538/*
2539 * Map a list of image extents to a list of object extents, create the
2540 * corresponding object requests (normally each to a different object,
2541 * but not always) and add them to @img_req.  For each object request,
2542 * set up its data descriptor to point to the corresponding chunk(s) of
2543 * @fctx->pos data buffer.
2544 *
2545 * Because ceph_file_to_extents() will merge adjacent object extents
2546 * together, each object request's data descriptor may point to multiple
2547 * different chunks of @fctx->pos data buffer.
2548 *
2549 * @fctx->pos data buffer is assumed to be large enough.
2550 */
2551static int rbd_img_fill_request(struct rbd_img_request *img_req,
2552                                struct ceph_file_extent *img_extents,
2553                                u32 num_img_extents,
2554                                struct rbd_img_fill_ctx *fctx)
2555{
2556        struct rbd_device *rbd_dev = img_req->rbd_dev;
2557        struct rbd_obj_request *obj_req;
2558        u32 i;
2559        int ret;
2560
2561        if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2562            !rbd_layout_is_fancy(&rbd_dev->layout))
2563                return rbd_img_fill_request_nocopy(img_req, img_extents,
2564                                                   num_img_extents, fctx);
2565
2566        img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2567
2568        /*
2569         * Create object requests and determine ->bvec_count for each object
2570         * request.  Note that ->bvec_count sum over all object requests may
2571         * be greater than the number of bio_vecs in the provided bio (list)
2572         * or bio_vec array because when mapped, those bio_vecs can straddle
2573         * stripe unit boundaries.
2574         */
2575        fctx->iter = *fctx->pos;
2576        for (i = 0; i < num_img_extents; i++) {
2577                ret = ceph_file_to_extents(&rbd_dev->layout,
2578                                           img_extents[i].fe_off,
2579                                           img_extents[i].fe_len,
2580                                           &img_req->object_extents,
2581                                           alloc_object_extent, img_req,
2582                                           fctx->count_fn, &fctx->iter);
2583                if (ret)
2584                        return ret;
2585        }
2586
2587        for_each_obj_request(img_req, obj_req) {
2588                obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2589                                              sizeof(*obj_req->bvec_pos.bvecs),
2590                                              GFP_NOIO);
2591                if (!obj_req->bvec_pos.bvecs)
2592                        return -ENOMEM;
2593        }
2594
2595        /*
2596         * Fill in each object request's private bio_vec array, splitting and
2597         * rearranging the provided bio_vecs in stripe unit chunks as needed.
2598         */
2599        fctx->iter = *fctx->pos;
2600        for (i = 0; i < num_img_extents; i++) {
2601                ret = ceph_iterate_extents(&rbd_dev->layout,
2602                                           img_extents[i].fe_off,
2603                                           img_extents[i].fe_len,
2604                                           &img_req->object_extents,
2605                                           fctx->copy_fn, &fctx->iter);
2606                if (ret)
2607                        return ret;
2608        }
2609
2610        return __rbd_img_fill_request(img_req);
2611}
2612
2613static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2614                               u64 off, u64 len)
2615{
2616        struct ceph_file_extent ex = { off, len };
2617        union rbd_img_fill_iter dummy = {};
2618        struct rbd_img_fill_ctx fctx = {
2619                .pos_type = OBJ_REQUEST_NODATA,
2620                .pos = &dummy,
2621        };
2622
2623        return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2624}
2625
2626static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2627{
2628        struct rbd_obj_request *obj_req =
2629            container_of(ex, struct rbd_obj_request, ex);
2630        struct ceph_bio_iter *it = arg;
2631
2632        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2633        obj_req->bio_pos = *it;
2634        ceph_bio_iter_advance(it, bytes);
2635}
2636
2637static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2638{
2639        struct rbd_obj_request *obj_req =
2640            container_of(ex, struct rbd_obj_request, ex);
2641        struct ceph_bio_iter *it = arg;
2642
2643        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2644        ceph_bio_iter_advance_step(it, bytes, ({
2645                obj_req->bvec_count++;
2646        }));
2647
2648}
2649
2650static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2651{
2652        struct rbd_obj_request *obj_req =
2653            container_of(ex, struct rbd_obj_request, ex);
2654        struct ceph_bio_iter *it = arg;
2655
2656        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2657        ceph_bio_iter_advance_step(it, bytes, ({
2658                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2659                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2660        }));
2661}
2662
2663static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2664                                   struct ceph_file_extent *img_extents,
2665                                   u32 num_img_extents,
2666                                   struct ceph_bio_iter *bio_pos)
2667{
2668        struct rbd_img_fill_ctx fctx = {
2669                .pos_type = OBJ_REQUEST_BIO,
2670                .pos = (union rbd_img_fill_iter *)bio_pos,
2671                .set_pos_fn = set_bio_pos,
2672                .count_fn = count_bio_bvecs,
2673                .copy_fn = copy_bio_bvecs,
2674        };
2675
2676        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2677                                    &fctx);
2678}
2679
2680static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2681                                 u64 off, u64 len, struct bio *bio)
2682{
2683        struct ceph_file_extent ex = { off, len };
2684        struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2685
2686        return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2687}
2688
2689static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2690{
2691        struct rbd_obj_request *obj_req =
2692            container_of(ex, struct rbd_obj_request, ex);
2693        struct ceph_bvec_iter *it = arg;
2694
2695        obj_req->bvec_pos = *it;
2696        ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2697        ceph_bvec_iter_advance(it, bytes);
2698}
2699
2700static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2701{
2702        struct rbd_obj_request *obj_req =
2703            container_of(ex, struct rbd_obj_request, ex);
2704        struct ceph_bvec_iter *it = arg;
2705
2706        ceph_bvec_iter_advance_step(it, bytes, ({
2707                obj_req->bvec_count++;
2708        }));
2709}
2710
2711static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2712{
2713        struct rbd_obj_request *obj_req =
2714            container_of(ex, struct rbd_obj_request, ex);
2715        struct ceph_bvec_iter *it = arg;
2716
2717        ceph_bvec_iter_advance_step(it, bytes, ({
2718                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2719                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2720        }));
2721}
2722
2723static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2724                                     struct ceph_file_extent *img_extents,
2725                                     u32 num_img_extents,
2726                                     struct ceph_bvec_iter *bvec_pos)
2727{
2728        struct rbd_img_fill_ctx fctx = {
2729                .pos_type = OBJ_REQUEST_BVECS,
2730                .pos = (union rbd_img_fill_iter *)bvec_pos,
2731                .set_pos_fn = set_bvec_pos,
2732                .count_fn = count_bvecs,
2733                .copy_fn = copy_bvecs,
2734        };
2735
2736        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2737                                    &fctx);
2738}
2739
2740static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2741                                   struct ceph_file_extent *img_extents,
2742                                   u32 num_img_extents,
2743                                   struct bio_vec *bvecs)
2744{
2745        struct ceph_bvec_iter it = {
2746                .bvecs = bvecs,
2747                .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2748                                                             num_img_extents) },
2749        };
2750
2751        return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2752                                         &it);
2753}
2754
2755static void rbd_img_handle_request_work(struct work_struct *work)
2756{
2757        struct rbd_img_request *img_req =
2758            container_of(work, struct rbd_img_request, work);
2759
2760        rbd_img_handle_request(img_req, img_req->work_result);
2761}
2762
2763static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2764{
2765        INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2766        img_req->work_result = result;
2767        queue_work(rbd_wq, &img_req->work);
2768}
2769
2770static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2771{
2772        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2773
2774        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2775                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2776                return true;
2777        }
2778
2779        dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2780             obj_req->ex.oe_objno);
2781        return false;
2782}
2783
2784static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2785{
2786        struct ceph_osd_request *osd_req;
2787        int ret;
2788
2789        osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2790        if (IS_ERR(osd_req))
2791                return PTR_ERR(osd_req);
2792
2793        osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2794                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2795        rbd_osd_setup_data(osd_req, 0);
2796        rbd_osd_format_read(osd_req);
2797
2798        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2799        if (ret)
2800                return ret;
2801
2802        rbd_osd_submit(osd_req);
2803        return 0;
2804}
2805
2806static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2807{
2808        struct rbd_img_request *img_req = obj_req->img_request;
2809        struct rbd_device *parent = img_req->rbd_dev->parent;
2810        struct rbd_img_request *child_img_req;
2811        int ret;
2812
2813        child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2814        if (!child_img_req)
2815                return -ENOMEM;
2816
2817        rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2818        __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2819        child_img_req->obj_request = obj_req;
2820
2821        down_read(&parent->header_rwsem);
2822        rbd_img_capture_header(child_img_req);
2823        up_read(&parent->header_rwsem);
2824
2825        dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2826             obj_req);
2827
2828        if (!rbd_img_is_write(img_req)) {
2829                switch (img_req->data_type) {
2830                case OBJ_REQUEST_BIO:
2831                        ret = __rbd_img_fill_from_bio(child_img_req,
2832                                                      obj_req->img_extents,
2833                                                      obj_req->num_img_extents,
2834                                                      &obj_req->bio_pos);
2835                        break;
2836                case OBJ_REQUEST_BVECS:
2837                case OBJ_REQUEST_OWN_BVECS:
2838                        ret = __rbd_img_fill_from_bvecs(child_img_req,
2839                                                      obj_req->img_extents,
2840                                                      obj_req->num_img_extents,
2841                                                      &obj_req->bvec_pos);
2842                        break;
2843                default:
2844                        BUG();
2845                }
2846        } else {
2847                ret = rbd_img_fill_from_bvecs(child_img_req,
2848                                              obj_req->img_extents,
2849                                              obj_req->num_img_extents,
2850                                              obj_req->copyup_bvecs);
2851        }
2852        if (ret) {
2853                rbd_img_request_destroy(child_img_req);
2854                return ret;
2855        }
2856
2857        /* avoid parent chain recursion */
2858        rbd_img_schedule(child_img_req, 0);
2859        return 0;
2860}
2861
2862static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2863{
2864        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2865        int ret;
2866
2867again:
2868        switch (obj_req->read_state) {
2869        case RBD_OBJ_READ_START:
2870                rbd_assert(!*result);
2871
2872                if (!rbd_obj_may_exist(obj_req)) {
2873                        *result = -ENOENT;
2874                        obj_req->read_state = RBD_OBJ_READ_OBJECT;
2875                        goto again;
2876                }
2877
2878                ret = rbd_obj_read_object(obj_req);
2879                if (ret) {
2880                        *result = ret;
2881                        return true;
2882                }
2883                obj_req->read_state = RBD_OBJ_READ_OBJECT;
2884                return false;
2885        case RBD_OBJ_READ_OBJECT:
2886                if (*result == -ENOENT && rbd_dev->parent_overlap) {
2887                        /* reverse map this object extent onto the parent */
2888                        ret = rbd_obj_calc_img_extents(obj_req, false);
2889                        if (ret) {
2890                                *result = ret;
2891                                return true;
2892                        }
2893                        if (obj_req->num_img_extents) {
2894                                ret = rbd_obj_read_from_parent(obj_req);
2895                                if (ret) {
2896                                        *result = ret;
2897                                        return true;
2898                                }
2899                                obj_req->read_state = RBD_OBJ_READ_PARENT;
2900                                return false;
2901                        }
2902                }
2903
2904                /*
2905                 * -ENOENT means a hole in the image -- zero-fill the entire
2906                 * length of the request.  A short read also implies zero-fill
2907                 * to the end of the request.
2908                 */
2909                if (*result == -ENOENT) {
2910                        rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2911                        *result = 0;
2912                } else if (*result >= 0) {
2913                        if (*result < obj_req->ex.oe_len)
2914                                rbd_obj_zero_range(obj_req, *result,
2915                                                obj_req->ex.oe_len - *result);
2916                        else
2917                                rbd_assert(*result == obj_req->ex.oe_len);
2918                        *result = 0;
2919                }
2920                return true;
2921        case RBD_OBJ_READ_PARENT:
2922                /*
2923                 * The parent image is read only up to the overlap -- zero-fill
2924                 * from the overlap to the end of the request.
2925                 */
2926                if (!*result) {
2927                        u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2928
2929                        if (obj_overlap < obj_req->ex.oe_len)
2930                                rbd_obj_zero_range(obj_req, obj_overlap,
2931                                            obj_req->ex.oe_len - obj_overlap);
2932                }
2933                return true;
2934        default:
2935                BUG();
2936        }
2937}
2938
2939static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2940{
2941        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2942
2943        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2944                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2945
2946        if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2947            (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2948                dout("%s %p noop for nonexistent\n", __func__, obj_req);
2949                return true;
2950        }
2951
2952        return false;
2953}
2954
2955/*
2956 * Return:
2957 *   0 - object map update sent
2958 *   1 - object map update isn't needed
2959 *  <0 - error
2960 */
2961static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2962{
2963        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2964        u8 new_state;
2965
2966        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2967                return 1;
2968
2969        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2970                new_state = OBJECT_PENDING;
2971        else
2972                new_state = OBJECT_EXISTS;
2973
2974        return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2975}
2976
2977static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2978{
2979        struct ceph_osd_request *osd_req;
2980        int num_ops = count_write_ops(obj_req);
2981        int which = 0;
2982        int ret;
2983
2984        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2985                num_ops++; /* stat */
2986
2987        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2988        if (IS_ERR(osd_req))
2989                return PTR_ERR(osd_req);
2990
2991        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2992                ret = rbd_osd_setup_stat(osd_req, which++);
2993                if (ret)
2994                        return ret;
2995        }
2996
2997        rbd_osd_setup_write_ops(osd_req, which);
2998        rbd_osd_format_write(osd_req);
2999
3000        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3001        if (ret)
3002                return ret;
3003
3004        rbd_osd_submit(osd_req);
3005        return 0;
3006}
3007
3008/*
3009 * copyup_bvecs pages are never highmem pages
3010 */
3011static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3012{
3013        struct ceph_bvec_iter it = {
3014                .bvecs = bvecs,
3015                .iter = { .bi_size = bytes },
3016        };
3017
3018        ceph_bvec_iter_advance_step(&it, bytes, ({
3019                if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3020                               bv.bv_len))
3021                        return false;
3022        }));
3023        return true;
3024}
3025
3026#define MODS_ONLY       U32_MAX
3027
3028static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3029                                      u32 bytes)
3030{
3031        struct ceph_osd_request *osd_req;
3032        int ret;
3033
3034        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3035        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3036
3037        osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3038        if (IS_ERR(osd_req))
3039                return PTR_ERR(osd_req);
3040
3041        ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3042        if (ret)
3043                return ret;
3044
3045        rbd_osd_format_write(osd_req);
3046
3047        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3048        if (ret)
3049                return ret;
3050
3051        rbd_osd_submit(osd_req);
3052        return 0;
3053}
3054
3055static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3056                                        u32 bytes)
3057{
3058        struct ceph_osd_request *osd_req;
3059        int num_ops = count_write_ops(obj_req);
3060        int which = 0;
3061        int ret;
3062
3063        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3064
3065        if (bytes != MODS_ONLY)
3066                num_ops++; /* copyup */
3067
3068        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3069        if (IS_ERR(osd_req))
3070                return PTR_ERR(osd_req);
3071
3072        if (bytes != MODS_ONLY) {
3073                ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3074                if (ret)
3075                        return ret;
3076        }
3077
3078        rbd_osd_setup_write_ops(osd_req, which);
3079        rbd_osd_format_write(osd_req);
3080
3081        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3082        if (ret)
3083                return ret;
3084
3085        rbd_osd_submit(osd_req);
3086        return 0;
3087}
3088
3089static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3090{
3091        u32 i;
3092
3093        rbd_assert(!obj_req->copyup_bvecs);
3094        obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3095        obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3096                                        sizeof(*obj_req->copyup_bvecs),
3097                                        GFP_NOIO);
3098        if (!obj_req->copyup_bvecs)
3099                return -ENOMEM;
3100
3101        for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3102                unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3103
3104                obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3105                if (!obj_req->copyup_bvecs[i].bv_page)
3106                        return -ENOMEM;
3107
3108                obj_req->copyup_bvecs[i].bv_offset = 0;
3109                obj_req->copyup_bvecs[i].bv_len = len;
3110                obj_overlap -= len;
3111        }
3112
3113        rbd_assert(!obj_overlap);
3114        return 0;
3115}
3116
3117/*
3118 * The target object doesn't exist.  Read the data for the entire
3119 * target object up to the overlap point (if any) from the parent,
3120 * so we can use it for a copyup.
3121 */
3122static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3123{
3124        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3125        int ret;
3126
3127        rbd_assert(obj_req->num_img_extents);
3128        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3129                      rbd_dev->parent_overlap);
3130        if (!obj_req->num_img_extents) {
3131                /*
3132                 * The overlap has become 0 (most likely because the
3133                 * image has been flattened).  Re-submit the original write
3134                 * request -- pass MODS_ONLY since the copyup isn't needed
3135                 * anymore.
3136                 */
3137                return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3138        }
3139
3140        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3141        if (ret)
3142                return ret;
3143
3144        return rbd_obj_read_from_parent(obj_req);
3145}
3146
3147static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3148{
3149        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3150        struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3151        u8 new_state;
3152        u32 i;
3153        int ret;
3154
3155        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3156
3157        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3158                return;
3159
3160        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3161                return;
3162
3163        for (i = 0; i < snapc->num_snaps; i++) {
3164                if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3165                    i + 1 < snapc->num_snaps)
3166                        new_state = OBJECT_EXISTS_CLEAN;
3167                else
3168                        new_state = OBJECT_EXISTS;
3169
3170                ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3171                                            new_state, NULL);
3172                if (ret < 0) {
3173                        obj_req->pending.result = ret;
3174                        return;
3175                }
3176
3177                rbd_assert(!ret);
3178                obj_req->pending.num_pending++;
3179        }
3180}
3181
3182static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3183{
3184        u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3185        int ret;
3186
3187        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3188
3189        /*
3190         * Only send non-zero copyup data to save some I/O and network
3191         * bandwidth -- zero copyup data is equivalent to the object not
3192         * existing.
3193         */
3194        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3195                bytes = 0;
3196
3197        if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3198                /*
3199                 * Send a copyup request with an empty snapshot context to
3200                 * deep-copyup the object through all existing snapshots.
3201                 * A second request with the current snapshot context will be
3202                 * sent for the actual modification.
3203                 */
3204                ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3205                if (ret) {
3206                        obj_req->pending.result = ret;
3207                        return;
3208                }
3209
3210                obj_req->pending.num_pending++;
3211                bytes = MODS_ONLY;
3212        }
3213
3214        ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3215        if (ret) {
3216                obj_req->pending.result = ret;
3217                return;
3218        }
3219
3220        obj_req->pending.num_pending++;
3221}
3222
3223static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3224{
3225        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3226        int ret;
3227
3228again:
3229        switch (obj_req->copyup_state) {
3230        case RBD_OBJ_COPYUP_START:
3231                rbd_assert(!*result);
3232
3233                ret = rbd_obj_copyup_read_parent(obj_req);
3234                if (ret) {
3235                        *result = ret;
3236                        return true;
3237                }
3238                if (obj_req->num_img_extents)
3239                        obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3240                else
3241                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3242                return false;
3243        case RBD_OBJ_COPYUP_READ_PARENT:
3244                if (*result)
3245                        return true;
3246
3247                if (is_zero_bvecs(obj_req->copyup_bvecs,
3248                                  rbd_obj_img_extents_bytes(obj_req))) {
3249                        dout("%s %p detected zeros\n", __func__, obj_req);
3250                        obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3251                }
3252
3253                rbd_obj_copyup_object_maps(obj_req);
3254                if (!obj_req->pending.num_pending) {
3255                        *result = obj_req->pending.result;
3256                        obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3257                        goto again;
3258                }
3259                obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3260                return false;
3261        case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3262                if (!pending_result_dec(&obj_req->pending, result))
3263                        return false;
3264                fallthrough;
3265        case RBD_OBJ_COPYUP_OBJECT_MAPS:
3266                if (*result) {
3267                        rbd_warn(rbd_dev, "snap object map update failed: %d",
3268                                 *result);
3269                        return true;
3270                }
3271
3272                rbd_obj_copyup_write_object(obj_req);
3273                if (!obj_req->pending.num_pending) {
3274                        *result = obj_req->pending.result;
3275                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3276                        goto again;
3277                }
3278                obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3279                return false;
3280        case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3281                if (!pending_result_dec(&obj_req->pending, result))
3282                        return false;
3283                fallthrough;
3284        case RBD_OBJ_COPYUP_WRITE_OBJECT:
3285                return true;
3286        default:
3287                BUG();
3288        }
3289}
3290
3291/*
3292 * Return:
3293 *   0 - object map update sent
3294 *   1 - object map update isn't needed
3295 *  <0 - error
3296 */
3297static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3298{
3299        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3300        u8 current_state = OBJECT_PENDING;
3301
3302        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3303                return 1;
3304
3305        if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3306                return 1;
3307
3308        return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3309                                     &current_state);
3310}
3311
3312static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3313{
3314        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3315        int ret;
3316
3317again:
3318        switch (obj_req->write_state) {
3319        case RBD_OBJ_WRITE_START:
3320                rbd_assert(!*result);
3321
3322                if (rbd_obj_write_is_noop(obj_req))
3323                        return true;
3324
3325                ret = rbd_obj_write_pre_object_map(obj_req);
3326                if (ret < 0) {
3327                        *result = ret;
3328                        return true;
3329                }
3330                obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3331                if (ret > 0)
3332                        goto again;
3333                return false;
3334        case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3335                if (*result) {
3336                        rbd_warn(rbd_dev, "pre object map update failed: %d",
3337                                 *result);
3338                        return true;
3339                }
3340                ret = rbd_obj_write_object(obj_req);
3341                if (ret) {
3342                        *result = ret;
3343                        return true;
3344                }
3345                obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3346                return false;
3347        case RBD_OBJ_WRITE_OBJECT:
3348                if (*result == -ENOENT) {
3349                        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3350                                *result = 0;
3351                                obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3352                                obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3353                                goto again;
3354                        }
3355                        /*
3356                         * On a non-existent object:
3357                         *   delete - -ENOENT, truncate/zero - 0
3358                         */
3359                        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3360                                *result = 0;
3361                }
3362                if (*result)
3363                        return true;
3364
3365                obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3366                goto again;
3367        case __RBD_OBJ_WRITE_COPYUP:
3368                if (!rbd_obj_advance_copyup(obj_req, result))
3369                        return false;
3370                fallthrough;
3371        case RBD_OBJ_WRITE_COPYUP:
3372                if (*result) {
3373                        rbd_warn(rbd_dev, "copyup failed: %d", *result);
3374                        return true;
3375                }
3376                ret = rbd_obj_write_post_object_map(obj_req);
3377                if (ret < 0) {
3378                        *result = ret;
3379                        return true;
3380                }
3381                obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3382                if (ret > 0)
3383                        goto again;
3384                return false;
3385        case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3386                if (*result)
3387                        rbd_warn(rbd_dev, "post object map update failed: %d",
3388                                 *result);
3389                return true;
3390        default:
3391                BUG();
3392        }
3393}
3394
3395/*
3396 * Return true if @obj_req is completed.
3397 */
3398static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3399                                     int *result)
3400{
3401        struct rbd_img_request *img_req = obj_req->img_request;
3402        struct rbd_device *rbd_dev = img_req->rbd_dev;
3403        bool done;
3404
3405        mutex_lock(&obj_req->state_mutex);
3406        if (!rbd_img_is_write(img_req))
3407                done = rbd_obj_advance_read(obj_req, result);
3408        else
3409                done = rbd_obj_advance_write(obj_req, result);
3410        mutex_unlock(&obj_req->state_mutex);
3411
3412        if (done && *result) {
3413                rbd_assert(*result < 0);
3414                rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3415                         obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3416                         obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3417        }
3418        return done;
3419}
3420
3421/*
3422 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3423 * recursion.
3424 */
3425static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3426{
3427        if (__rbd_obj_handle_request(obj_req, &result))
3428                rbd_img_handle_request(obj_req->img_request, result);
3429}
3430
3431static bool need_exclusive_lock(struct rbd_img_request *img_req)
3432{
3433        struct rbd_device *rbd_dev = img_req->rbd_dev;
3434
3435        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3436                return false;
3437
3438        if (rbd_is_ro(rbd_dev))
3439                return false;
3440
3441        rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3442        if (rbd_dev->opts->lock_on_read ||
3443            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3444                return true;
3445
3446        return rbd_img_is_write(img_req);
3447}
3448
3449static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3450{
3451        struct rbd_device *rbd_dev = img_req->rbd_dev;
3452        bool locked;
3453
3454        lockdep_assert_held(&rbd_dev->lock_rwsem);
3455        locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3456        spin_lock(&rbd_dev->lock_lists_lock);
3457        rbd_assert(list_empty(&img_req->lock_item));
3458        if (!locked)
3459                list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3460        else
3461                list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3462        spin_unlock(&rbd_dev->lock_lists_lock);
3463        return locked;
3464}
3465
3466static void rbd_lock_del_request(struct rbd_img_request *img_req)
3467{
3468        struct rbd_device *rbd_dev = img_req->rbd_dev;
3469        bool need_wakeup;
3470
3471        lockdep_assert_held(&rbd_dev->lock_rwsem);
3472        spin_lock(&rbd_dev->lock_lists_lock);
3473        rbd_assert(!list_empty(&img_req->lock_item));
3474        list_del_init(&img_req->lock_item);
3475        need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3476                       list_empty(&rbd_dev->running_list));
3477        spin_unlock(&rbd_dev->lock_lists_lock);
3478        if (need_wakeup)
3479                complete(&rbd_dev->releasing_wait);
3480}
3481
3482static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3483{
3484        struct rbd_device *rbd_dev = img_req->rbd_dev;
3485
3486        if (!need_exclusive_lock(img_req))
3487                return 1;
3488
3489        if (rbd_lock_add_request(img_req))
3490                return 1;
3491
3492        if (rbd_dev->opts->exclusive) {
3493                WARN_ON(1); /* lock got released? */
3494                return -EROFS;
3495        }
3496
3497        /*
3498         * Note the use of mod_delayed_work() in rbd_acquire_lock()
3499         * and cancel_delayed_work() in wake_lock_waiters().
3500         */
3501        dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3502        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3503        return 0;
3504}
3505
3506static void rbd_img_object_requests(struct rbd_img_request *img_req)
3507{
3508        struct rbd_obj_request *obj_req;
3509
3510        rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3511
3512        for_each_obj_request(img_req, obj_req) {
3513                int result = 0;
3514
3515                if (__rbd_obj_handle_request(obj_req, &result)) {
3516                        if (result) {
3517                                img_req->pending.result = result;
3518                                return;
3519                        }
3520                } else {
3521                        img_req->pending.num_pending++;
3522                }
3523        }
3524}
3525
3526static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3527{
3528        struct rbd_device *rbd_dev = img_req->rbd_dev;
3529        int ret;
3530
3531again:
3532        switch (img_req->state) {
3533        case RBD_IMG_START:
3534                rbd_assert(!*result);
3535
3536                ret = rbd_img_exclusive_lock(img_req);
3537                if (ret < 0) {
3538                        *result = ret;
3539                        return true;
3540                }
3541                img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3542                if (ret > 0)
3543                        goto again;
3544                return false;
3545        case RBD_IMG_EXCLUSIVE_LOCK:
3546                if (*result)
3547                        return true;
3548
3549                rbd_assert(!need_exclusive_lock(img_req) ||
3550                           __rbd_is_lock_owner(rbd_dev));
3551
3552                rbd_img_object_requests(img_req);
3553                if (!img_req->pending.num_pending) {
3554                        *result = img_req->pending.result;
3555                        img_req->state = RBD_IMG_OBJECT_REQUESTS;
3556                        goto again;
3557                }
3558                img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3559                return false;
3560        case __RBD_IMG_OBJECT_REQUESTS:
3561                if (!pending_result_dec(&img_req->pending, result))
3562                        return false;
3563                fallthrough;
3564        case RBD_IMG_OBJECT_REQUESTS:
3565                return true;
3566        default:
3567                BUG();
3568        }
3569}
3570
3571/*
3572 * Return true if @img_req is completed.
3573 */
3574static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3575                                     int *result)
3576{
3577        struct rbd_device *rbd_dev = img_req->rbd_dev;
3578        bool done;
3579
3580        if (need_exclusive_lock(img_req)) {
3581                down_read(&rbd_dev->lock_rwsem);
3582                mutex_lock(&img_req->state_mutex);
3583                done = rbd_img_advance(img_req, result);
3584                if (done)
3585                        rbd_lock_del_request(img_req);
3586                mutex_unlock(&img_req->state_mutex);
3587                up_read(&rbd_dev->lock_rwsem);
3588        } else {
3589                mutex_lock(&img_req->state_mutex);
3590                done = rbd_img_advance(img_req, result);
3591                mutex_unlock(&img_req->state_mutex);
3592        }
3593
3594        if (done && *result) {
3595                rbd_assert(*result < 0);
3596                rbd_warn(rbd_dev, "%s%s result %d",
3597                      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3598                      obj_op_name(img_req->op_type), *result);
3599        }
3600        return done;
3601}
3602
3603static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3604{
3605again:
3606        if (!__rbd_img_handle_request(img_req, &result))
3607                return;
3608
3609        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3610                struct rbd_obj_request *obj_req = img_req->obj_request;
3611
3612                rbd_img_request_destroy(img_req);
3613                if (__rbd_obj_handle_request(obj_req, &result)) {
3614                        img_req = obj_req->img_request;
3615                        goto again;
3616                }
3617        } else {
3618                struct request *rq = blk_mq_rq_from_pdu(img_req);
3619
3620                rbd_img_request_destroy(img_req);
3621                blk_mq_end_request(rq, errno_to_blk_status(result));
3622        }
3623}
3624
3625static const struct rbd_client_id rbd_empty_cid;
3626
3627static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3628                          const struct rbd_client_id *rhs)
3629{
3630        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3631}
3632
3633static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3634{
3635        struct rbd_client_id cid;
3636
3637        mutex_lock(&rbd_dev->watch_mutex);
3638        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3639        cid.handle = rbd_dev->watch_cookie;
3640        mutex_unlock(&rbd_dev->watch_mutex);
3641        return cid;
3642}
3643
3644/*
3645 * lock_rwsem must be held for write
3646 */
3647static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3648                              const struct rbd_client_id *cid)
3649{
3650        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3651             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3652             cid->gid, cid->handle);
3653        rbd_dev->owner_cid = *cid; /* struct */
3654}
3655
3656static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3657{
3658        mutex_lock(&rbd_dev->watch_mutex);
3659        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3660        mutex_unlock(&rbd_dev->watch_mutex);
3661}
3662
3663static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3664{
3665        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3666
3667        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3668        strcpy(rbd_dev->lock_cookie, cookie);
3669        rbd_set_owner_cid(rbd_dev, &cid);
3670        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3671}
3672
3673/*
3674 * lock_rwsem must be held for write
3675 */
3676static int rbd_lock(struct rbd_device *rbd_dev)
3677{
3678        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3679        char cookie[32];
3680        int ret;
3681
3682        WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3683                rbd_dev->lock_cookie[0] != '\0');
3684
3685        format_lock_cookie(rbd_dev, cookie);
3686        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3687                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3688                            RBD_LOCK_TAG, "", 0);
3689        if (ret)
3690                return ret;
3691
3692        __rbd_lock(rbd_dev, cookie);
3693        return 0;
3694}
3695
3696/*
3697 * lock_rwsem must be held for write
3698 */
3699static void rbd_unlock(struct rbd_device *rbd_dev)
3700{
3701        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3702        int ret;
3703
3704        WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3705                rbd_dev->lock_cookie[0] == '\0');
3706
3707        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3708                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
3709        if (ret && ret != -ENOENT)
3710                rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3711
3712        /* treat errors as the image is unlocked */
3713        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3714        rbd_dev->lock_cookie[0] = '\0';
3715        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3716        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3717}
3718
3719static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3720                                enum rbd_notify_op notify_op,
3721                                struct page ***preply_pages,
3722                                size_t *preply_len)
3723{
3724        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3725        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3726        char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3727        int buf_size = sizeof(buf);
3728        void *p = buf;
3729
3730        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3731
3732        /* encode *LockPayload NotifyMessage (op + ClientId) */
3733        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3734        ceph_encode_32(&p, notify_op);
3735        ceph_encode_64(&p, cid.gid);
3736        ceph_encode_64(&p, cid.handle);
3737
3738        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3739                                &rbd_dev->header_oloc, buf, buf_size,
3740                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3741}
3742
3743static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3744                               enum rbd_notify_op notify_op)
3745{
3746        __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3747}
3748
3749static void rbd_notify_acquired_lock(struct work_struct *work)
3750{
3751        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3752                                                  acquired_lock_work);
3753
3754        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3755}
3756
3757static void rbd_notify_released_lock(struct work_struct *work)
3758{
3759        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3760                                                  released_lock_work);
3761
3762        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3763}
3764
3765static int rbd_request_lock(struct rbd_device *rbd_dev)
3766{
3767        struct page **reply_pages;
3768        size_t reply_len;
3769        bool lock_owner_responded = false;
3770        int ret;
3771
3772        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3773
3774        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3775                                   &reply_pages, &reply_len);
3776        if (ret && ret != -ETIMEDOUT) {
3777                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3778                goto out;
3779        }
3780
3781        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3782                void *p = page_address(reply_pages[0]);
3783                void *const end = p + reply_len;
3784                u32 n;
3785
3786                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3787                while (n--) {
3788                        u8 struct_v;
3789                        u32 len;
3790
3791                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3792                        p += 8 + 8; /* skip gid and cookie */
3793
3794                        ceph_decode_32_safe(&p, end, len, e_inval);
3795                        if (!len)
3796                                continue;
3797
3798                        if (lock_owner_responded) {
3799                                rbd_warn(rbd_dev,
3800                                         "duplicate lock owners detected");
3801                                ret = -EIO;
3802                                goto out;
3803                        }
3804
3805                        lock_owner_responded = true;
3806                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3807                                                  &struct_v, &len);
3808                        if (ret) {
3809                                rbd_warn(rbd_dev,
3810                                         "failed to decode ResponseMessage: %d",
3811                                         ret);
3812                                goto e_inval;
3813                        }
3814
3815                        ret = ceph_decode_32(&p);
3816                }
3817        }
3818
3819        if (!lock_owner_responded) {
3820                rbd_warn(rbd_dev, "no lock owners detected");
3821                ret = -ETIMEDOUT;
3822        }
3823
3824out:
3825        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3826        return ret;
3827
3828e_inval:
3829        ret = -EINVAL;
3830        goto out;
3831}
3832
3833/*
3834 * Either image request state machine(s) or rbd_add_acquire_lock()
3835 * (i.e. "rbd map").
3836 */
3837static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3838{
3839        struct rbd_img_request *img_req;
3840
3841        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3842        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3843
3844        cancel_delayed_work(&rbd_dev->lock_dwork);
3845        if (!completion_done(&rbd_dev->acquire_wait)) {
3846                rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3847                           list_empty(&rbd_dev->running_list));
3848                rbd_dev->acquire_err = result;
3849                complete_all(&rbd_dev->acquire_wait);
3850                return;
3851        }
3852
3853        list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3854                mutex_lock(&img_req->state_mutex);
3855                rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3856                rbd_img_schedule(img_req, result);
3857                mutex_unlock(&img_req->state_mutex);
3858        }
3859
3860        list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3861}
3862
3863static int get_lock_owner_info(struct rbd_device *rbd_dev,
3864                               struct ceph_locker **lockers, u32 *num_lockers)
3865{
3866        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3867        u8 lock_type;
3868        char *lock_tag;
3869        int ret;
3870
3871        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3872
3873        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3874                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3875                                 &lock_type, &lock_tag, lockers, num_lockers);
3876        if (ret)
3877                return ret;
3878
3879        if (*num_lockers == 0) {
3880                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3881                goto out;
3882        }
3883
3884        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3885                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3886                         lock_tag);
3887                ret = -EBUSY;
3888                goto out;
3889        }
3890
3891        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3892                rbd_warn(rbd_dev, "shared lock type detected");
3893                ret = -EBUSY;
3894                goto out;
3895        }
3896
3897        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3898                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3899                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3900                         (*lockers)[0].id.cookie);
3901                ret = -EBUSY;
3902                goto out;
3903        }
3904
3905out:
3906        kfree(lock_tag);
3907        return ret;
3908}
3909
3910static int find_watcher(struct rbd_device *rbd_dev,
3911                        const struct ceph_locker *locker)
3912{
3913        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3914        struct ceph_watch_item *watchers;
3915        u32 num_watchers;
3916        u64 cookie;
3917        int i;
3918        int ret;
3919
3920        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3921                                      &rbd_dev->header_oloc, &watchers,
3922                                      &num_watchers);
3923        if (ret)
3924                return ret;
3925
3926        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3927        for (i = 0; i < num_watchers; i++) {
3928                /*
3929                 * Ignore addr->type while comparing.  This mimics
3930                 * entity_addr_t::get_legacy_str() + strcmp().
3931                 */
3932                if (ceph_addr_equal_no_type(&watchers[i].addr,
3933                                            &locker->info.addr) &&
3934                    watchers[i].cookie == cookie) {
3935                        struct rbd_client_id cid = {
3936                                .gid = le64_to_cpu(watchers[i].name.num),
3937                                .handle = cookie,
3938                        };
3939
3940                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3941                             rbd_dev, cid.gid, cid.handle);
3942                        rbd_set_owner_cid(rbd_dev, &cid);
3943                        ret = 1;
3944                        goto out;
3945                }
3946        }
3947
3948        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3949        ret = 0;
3950out:
3951        kfree(watchers);
3952        return ret;
3953}
3954
3955/*
3956 * lock_rwsem must be held for write
3957 */
3958static int rbd_try_lock(struct rbd_device *rbd_dev)
3959{
3960        struct ceph_client *client = rbd_dev->rbd_client->client;
3961        struct ceph_locker *lockers;
3962        u32 num_lockers;
3963        int ret;
3964
3965        for (;;) {
3966                ret = rbd_lock(rbd_dev);
3967                if (ret != -EBUSY)
3968                        return ret;
3969
3970                /* determine if the current lock holder is still alive */
3971                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3972                if (ret)
3973                        return ret;
3974
3975                if (num_lockers == 0)
3976                        goto again;
3977
3978                ret = find_watcher(rbd_dev, lockers);
3979                if (ret)
3980                        goto out; /* request lock or error */
3981
3982                rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3983                         ENTITY_NAME(lockers[0].id.name));
3984
3985                ret = ceph_monc_blocklist_add(&client->monc,
3986                                              &lockers[0].info.addr);
3987                if (ret) {
3988                        rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3989                                 ENTITY_NAME(lockers[0].id.name), ret);
3990                        goto out;
3991                }
3992
3993                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3994                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
3995                                          lockers[0].id.cookie,
3996                                          &lockers[0].id.name);
3997                if (ret && ret != -ENOENT)
3998                        goto out;
3999
4000again:
4001                ceph_free_lockers(lockers, num_lockers);
4002        }
4003
4004out:
4005        ceph_free_lockers(lockers, num_lockers);
4006        return ret;
4007}
4008
4009static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4010{
4011        int ret;
4012
4013        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4014                ret = rbd_object_map_open(rbd_dev);
4015                if (ret)
4016                        return ret;
4017        }
4018
4019        return 0;
4020}
4021
4022/*
4023 * Return:
4024 *   0 - lock acquired
4025 *   1 - caller should call rbd_request_lock()
4026 *  <0 - error
4027 */
4028static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4029{
4030        int ret;
4031
4032        down_read(&rbd_dev->lock_rwsem);
4033        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4034             rbd_dev->lock_state);
4035        if (__rbd_is_lock_owner(rbd_dev)) {
4036                up_read(&rbd_dev->lock_rwsem);
4037                return 0;
4038        }
4039
4040        up_read(&rbd_dev->lock_rwsem);
4041        down_write(&rbd_dev->lock_rwsem);
4042        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4043             rbd_dev->lock_state);
4044        if (__rbd_is_lock_owner(rbd_dev)) {
4045                up_write(&rbd_dev->lock_rwsem);
4046                return 0;
4047        }
4048
4049        ret = rbd_try_lock(rbd_dev);
4050        if (ret < 0) {
4051                rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4052                if (ret == -EBLOCKLISTED)
4053                        goto out;
4054
4055                ret = 1; /* request lock anyway */
4056        }
4057        if (ret > 0) {
4058                up_write(&rbd_dev->lock_rwsem);
4059                return ret;
4060        }
4061
4062        rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4063        rbd_assert(list_empty(&rbd_dev->running_list));
4064
4065        ret = rbd_post_acquire_action(rbd_dev);
4066        if (ret) {
4067                rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4068                /*
4069                 * Can't stay in RBD_LOCK_STATE_LOCKED because
4070                 * rbd_lock_add_request() would let the request through,
4071                 * assuming that e.g. object map is locked and loaded.
4072                 */
4073                rbd_unlock(rbd_dev);
4074        }
4075
4076out:
4077        wake_lock_waiters(rbd_dev, ret);
4078        up_write(&rbd_dev->lock_rwsem);
4079        return ret;
4080}
4081
4082static void rbd_acquire_lock(struct work_struct *work)
4083{
4084        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4085                                            struct rbd_device, lock_dwork);
4086        int ret;
4087
4088        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4089again:
4090        ret = rbd_try_acquire_lock(rbd_dev);
4091        if (ret <= 0) {
4092                dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4093                return;
4094        }
4095
4096        ret = rbd_request_lock(rbd_dev);
4097        if (ret == -ETIMEDOUT) {
4098                goto again; /* treat this as a dead client */
4099        } else if (ret == -EROFS) {
4100                rbd_warn(rbd_dev, "peer will not release lock");
4101                down_write(&rbd_dev->lock_rwsem);
4102                wake_lock_waiters(rbd_dev, ret);
4103                up_write(&rbd_dev->lock_rwsem);
4104        } else if (ret < 0) {
4105                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4106                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4107                                 RBD_RETRY_DELAY);
4108        } else {
4109                /*
4110                 * lock owner acked, but resend if we don't see them
4111                 * release the lock
4112                 */
4113                dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4114                     rbd_dev);
4115                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4116                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4117        }
4118}
4119
4120static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4121{
4122        bool need_wait;
4123
4124        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4125        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4126
4127        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4128                return false;
4129
4130        /*
4131         * Ensure that all in-flight IO is flushed.
4132         */
4133        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4134        rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4135        need_wait = !list_empty(&rbd_dev->running_list);
4136        downgrade_write(&rbd_dev->lock_rwsem);
4137        if (need_wait)
4138                wait_for_completion(&rbd_dev->releasing_wait);
4139        up_read(&rbd_dev->lock_rwsem);
4140
4141        down_write(&rbd_dev->lock_rwsem);
4142        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4143                return false;
4144
4145        rbd_assert(list_empty(&rbd_dev->running_list));
4146        return true;
4147}
4148
4149static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4150{
4151        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4152                rbd_object_map_close(rbd_dev);
4153}
4154
4155static void __rbd_release_lock(struct rbd_device *rbd_dev)
4156{
4157        rbd_assert(list_empty(&rbd_dev->running_list));
4158
4159        rbd_pre_release_action(rbd_dev);
4160        rbd_unlock(rbd_dev);
4161}
4162
4163/*
4164 * lock_rwsem must be held for write
4165 */
4166static void rbd_release_lock(struct rbd_device *rbd_dev)
4167{
4168        if (!rbd_quiesce_lock(rbd_dev))
4169                return;
4170
4171        __rbd_release_lock(rbd_dev);
4172
4173        /*
4174         * Give others a chance to grab the lock - we would re-acquire
4175         * almost immediately if we got new IO while draining the running
4176         * list otherwise.  We need to ack our own notifications, so this
4177         * lock_dwork will be requeued from rbd_handle_released_lock() by
4178         * way of maybe_kick_acquire().
4179         */
4180        cancel_delayed_work(&rbd_dev->lock_dwork);
4181}
4182
4183static void rbd_release_lock_work(struct work_struct *work)
4184{
4185        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4186                                                  unlock_work);
4187
4188        down_write(&rbd_dev->lock_rwsem);
4189        rbd_release_lock(rbd_dev);
4190        up_write(&rbd_dev->lock_rwsem);
4191}
4192
4193static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4194{
4195        bool have_requests;
4196
4197        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4198        if (__rbd_is_lock_owner(rbd_dev))
4199                return;
4200
4201        spin_lock(&rbd_dev->lock_lists_lock);
4202        have_requests = !list_empty(&rbd_dev->acquiring_list);
4203        spin_unlock(&rbd_dev->lock_lists_lock);
4204        if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4205                dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4206                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4207        }
4208}
4209
4210static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4211                                     void **p)
4212{
4213        struct rbd_client_id cid = { 0 };
4214
4215        if (struct_v >= 2) {
4216                cid.gid = ceph_decode_64(p);
4217                cid.handle = ceph_decode_64(p);
4218        }
4219
4220        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4221             cid.handle);
4222        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4223                down_write(&rbd_dev->lock_rwsem);
4224                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4225                        /*
4226                         * we already know that the remote client is
4227                         * the owner
4228                         */
4229                        up_write(&rbd_dev->lock_rwsem);
4230                        return;
4231                }
4232
4233                rbd_set_owner_cid(rbd_dev, &cid);
4234                downgrade_write(&rbd_dev->lock_rwsem);
4235        } else {
4236                down_read(&rbd_dev->lock_rwsem);
4237        }
4238
4239        maybe_kick_acquire(rbd_dev);
4240        up_read(&rbd_dev->lock_rwsem);
4241}
4242
4243static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4244                                     void **p)
4245{
4246        struct rbd_client_id cid = { 0 };
4247
4248        if (struct_v >= 2) {
4249                cid.gid = ceph_decode_64(p);
4250                cid.handle = ceph_decode_64(p);
4251        }
4252
4253        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4254             cid.handle);
4255        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4256                down_write(&rbd_dev->lock_rwsem);
4257                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4258                        dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4259                             __func__, rbd_dev, cid.gid, cid.handle,
4260                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4261                        up_write(&rbd_dev->lock_rwsem);
4262                        return;
4263                }
4264
4265                rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4266                downgrade_write(&rbd_dev->lock_rwsem);
4267        } else {
4268                down_read(&rbd_dev->lock_rwsem);
4269        }
4270
4271        maybe_kick_acquire(rbd_dev);
4272        up_read(&rbd_dev->lock_rwsem);
4273}
4274
4275/*
4276 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4277 * ResponseMessage is needed.
4278 */
4279static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4280                                   void **p)
4281{
4282        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4283        struct rbd_client_id cid = { 0 };
4284        int result = 1;
4285
4286        if (struct_v >= 2) {
4287                cid.gid = ceph_decode_64(p);
4288                cid.handle = ceph_decode_64(p);
4289        }
4290
4291        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4292             cid.handle);
4293        if (rbd_cid_equal(&cid, &my_cid))
4294                return result;
4295
4296        down_read(&rbd_dev->lock_rwsem);
4297        if (__rbd_is_lock_owner(rbd_dev)) {
4298                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4299                    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4300                        goto out_unlock;
4301
4302                /*
4303                 * encode ResponseMessage(0) so the peer can detect
4304                 * a missing owner
4305                 */
4306                result = 0;
4307
4308                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4309                        if (!rbd_dev->opts->exclusive) {
4310                                dout("%s rbd_dev %p queueing unlock_work\n",
4311                                     __func__, rbd_dev);
4312                                queue_work(rbd_dev->task_wq,
4313                                           &rbd_dev->unlock_work);
4314                        } else {
4315                                /* refuse to release the lock */
4316                                result = -EROFS;
4317                        }
4318                }
4319        }
4320
4321out_unlock:
4322        up_read(&rbd_dev->lock_rwsem);
4323        return result;
4324}
4325
4326static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4327                                     u64 notify_id, u64 cookie, s32 *result)
4328{
4329        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4330        char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4331        int buf_size = sizeof(buf);
4332        int ret;
4333
4334        if (result) {
4335                void *p = buf;
4336
4337                /* encode ResponseMessage */
4338                ceph_start_encoding(&p, 1, 1,
4339                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
4340                ceph_encode_32(&p, *result);
4341        } else {
4342                buf_size = 0;
4343        }
4344
4345        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4346                                   &rbd_dev->header_oloc, notify_id, cookie,
4347                                   buf, buf_size);
4348        if (ret)
4349                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4350}
4351
4352static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4353                                   u64 cookie)
4354{
4355        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4356        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4357}
4358
4359static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4360                                          u64 notify_id, u64 cookie, s32 result)
4361{
4362        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4363        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4364}
4365
4366static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4367                         u64 notifier_id, void *data, size_t data_len)
4368{
4369        struct rbd_device *rbd_dev = arg;
4370        void *p = data;
4371        void *const end = p + data_len;
4372        u8 struct_v = 0;
4373        u32 len;
4374        u32 notify_op;
4375        int ret;
4376
4377        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4378             __func__, rbd_dev, cookie, notify_id, data_len);
4379        if (data_len) {
4380                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4381                                          &struct_v, &len);
4382                if (ret) {
4383                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4384                                 ret);
4385                        return;
4386                }
4387
4388                notify_op = ceph_decode_32(&p);
4389        } else {
4390                /* legacy notification for header updates */
4391                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4392                len = 0;
4393        }
4394
4395        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4396        switch (notify_op) {
4397        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4398                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4399                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4400                break;
4401        case RBD_NOTIFY_OP_RELEASED_LOCK:
4402                rbd_handle_released_lock(rbd_dev, struct_v, &p);
4403                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4404                break;
4405        case RBD_NOTIFY_OP_REQUEST_LOCK:
4406                ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4407                if (ret <= 0)
4408                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4409                                                      cookie, ret);
4410                else
4411                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4412                break;
4413        case RBD_NOTIFY_OP_HEADER_UPDATE:
4414                ret = rbd_dev_refresh(rbd_dev);
4415                if (ret)
4416                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
4417
4418                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4419                break;
4420        default:
4421                if (rbd_is_lock_owner(rbd_dev))
4422                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4423                                                      cookie, -EOPNOTSUPP);
4424                else
4425                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4426                break;
4427        }
4428}
4429
4430static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4431
4432static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4433{
4434        struct rbd_device *rbd_dev = arg;
4435
4436        rbd_warn(rbd_dev, "encountered watch error: %d", err);
4437
4438        down_write(&rbd_dev->lock_rwsem);
4439        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4440        up_write(&rbd_dev->lock_rwsem);
4441
4442        mutex_lock(&rbd_dev->watch_mutex);
4443        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4444                __rbd_unregister_watch(rbd_dev);
4445                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4446
4447                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4448        }
4449        mutex_unlock(&rbd_dev->watch_mutex);
4450}
4451
4452/*
4453 * watch_mutex must be locked
4454 */
4455static int __rbd_register_watch(struct rbd_device *rbd_dev)
4456{
4457        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4458        struct ceph_osd_linger_request *handle;
4459
4460        rbd_assert(!rbd_dev->watch_handle);
4461        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4462
4463        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4464                                 &rbd_dev->header_oloc, rbd_watch_cb,
4465                                 rbd_watch_errcb, rbd_dev);
4466        if (IS_ERR(handle))
4467                return PTR_ERR(handle);
4468
4469        rbd_dev->watch_handle = handle;
4470        return 0;
4471}
4472
4473/*
4474 * watch_mutex must be locked
4475 */
4476static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4477{
4478        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4479        int ret;
4480
4481        rbd_assert(rbd_dev->watch_handle);
4482        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4483
4484        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4485        if (ret)
4486                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4487
4488        rbd_dev->watch_handle = NULL;
4489}
4490
4491static int rbd_register_watch(struct rbd_device *rbd_dev)
4492{
4493        int ret;
4494
4495        mutex_lock(&rbd_dev->watch_mutex);
4496        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4497        ret = __rbd_register_watch(rbd_dev);
4498        if (ret)
4499                goto out;
4500
4501        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4502        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4503
4504out:
4505        mutex_unlock(&rbd_dev->watch_mutex);
4506        return ret;
4507}
4508
4509static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4510{
4511        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4512
4513        cancel_work_sync(&rbd_dev->acquired_lock_work);
4514        cancel_work_sync(&rbd_dev->released_lock_work);
4515        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4516        cancel_work_sync(&rbd_dev->unlock_work);
4517}
4518
4519/*
4520 * header_rwsem must not be held to avoid a deadlock with
4521 * rbd_dev_refresh() when flushing notifies.
4522 */
4523static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4524{
4525        cancel_tasks_sync(rbd_dev);
4526
4527        mutex_lock(&rbd_dev->watch_mutex);
4528        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4529                __rbd_unregister_watch(rbd_dev);
4530        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4531        mutex_unlock(&rbd_dev->watch_mutex);
4532
4533        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4534        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4535}
4536
4537/*
4538 * lock_rwsem must be held for write
4539 */
4540static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4541{
4542        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4543        char cookie[32];
4544        int ret;
4545
4546        if (!rbd_quiesce_lock(rbd_dev))
4547                return;
4548
4549        format_lock_cookie(rbd_dev, cookie);
4550        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4551                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4552                                  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4553                                  RBD_LOCK_TAG, cookie);
4554        if (ret) {
4555                if (ret != -EOPNOTSUPP)
4556                        rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4557                                 ret);
4558
4559                /*
4560                 * Lock cookie cannot be updated on older OSDs, so do
4561                 * a manual release and queue an acquire.
4562                 */
4563                __rbd_release_lock(rbd_dev);
4564                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4565        } else {
4566                __rbd_lock(rbd_dev, cookie);
4567                wake_lock_waiters(rbd_dev, 0);
4568        }
4569}
4570
4571static void rbd_reregister_watch(struct work_struct *work)
4572{
4573        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4574                                            struct rbd_device, watch_dwork);
4575        int ret;
4576
4577        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4578
4579        mutex_lock(&rbd_dev->watch_mutex);
4580        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4581                mutex_unlock(&rbd_dev->watch_mutex);
4582                return;
4583        }
4584
4585        ret = __rbd_register_watch(rbd_dev);
4586        if (ret) {
4587                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4588                if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4589                        queue_delayed_work(rbd_dev->task_wq,
4590                                           &rbd_dev->watch_dwork,
4591                                           RBD_RETRY_DELAY);
4592                        mutex_unlock(&rbd_dev->watch_mutex);
4593                        return;
4594                }
4595
4596                mutex_unlock(&rbd_dev->watch_mutex);
4597                down_write(&rbd_dev->lock_rwsem);
4598                wake_lock_waiters(rbd_dev, ret);
4599                up_write(&rbd_dev->lock_rwsem);
4600                return;
4601        }
4602
4603        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4604        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4605        mutex_unlock(&rbd_dev->watch_mutex);
4606
4607        down_write(&rbd_dev->lock_rwsem);
4608        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4609                rbd_reacquire_lock(rbd_dev);
4610        up_write(&rbd_dev->lock_rwsem);
4611
4612        ret = rbd_dev_refresh(rbd_dev);
4613        if (ret)
4614                rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4615}
4616
4617/*
4618 * Synchronous osd object method call.  Returns the number of bytes
4619 * returned in the outbound buffer, or a negative error code.
4620 */
4621static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4622                             struct ceph_object_id *oid,
4623                             struct ceph_object_locator *oloc,
4624                             const char *method_name,
4625                             const void *outbound,
4626                             size_t outbound_size,
4627                             void *inbound,
4628                             size_t inbound_size)
4629{
4630        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4631        struct page *req_page = NULL;
4632        struct page *reply_page;
4633        int ret;
4634
4635        /*
4636         * Method calls are ultimately read operations.  The result
4637         * should placed into the inbound buffer provided.  They
4638         * also supply outbound data--parameters for the object
4639         * method.  Currently if this is present it will be a
4640         * snapshot id.
4641         */
4642        if (outbound) {
4643                if (outbound_size > PAGE_SIZE)
4644                        return -E2BIG;
4645
4646                req_page = alloc_page(GFP_KERNEL);
4647                if (!req_page)
4648                        return -ENOMEM;
4649
4650                memcpy(page_address(req_page), outbound, outbound_size);
4651        }
4652
4653        reply_page = alloc_page(GFP_KERNEL);
4654        if (!reply_page) {
4655                if (req_page)
4656                        __free_page(req_page);
4657                return -ENOMEM;
4658        }
4659
4660        ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4661                             CEPH_OSD_FLAG_READ, req_page, outbound_size,
4662                             &reply_page, &inbound_size);
4663        if (!ret) {
4664                memcpy(inbound, page_address(reply_page), inbound_size);
4665                ret = inbound_size;
4666        }
4667
4668        if (req_page)
4669                __free_page(req_page);
4670        __free_page(reply_page);
4671        return ret;
4672}
4673
4674static void rbd_queue_workfn(struct work_struct *work)
4675{
4676        struct rbd_img_request *img_request =
4677            container_of(work, struct rbd_img_request, work);
4678        struct rbd_device *rbd_dev = img_request->rbd_dev;
4679        enum obj_operation_type op_type = img_request->op_type;
4680        struct request *rq = blk_mq_rq_from_pdu(img_request);
4681        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4682        u64 length = blk_rq_bytes(rq);
4683        u64 mapping_size;
4684        int result;
4685
4686        /* Ignore/skip any zero-length requests */
4687        if (!length) {
4688                dout("%s: zero-length request\n", __func__);
4689                result = 0;
4690                goto err_img_request;
4691        }
4692
4693        blk_mq_start_request(rq);
4694
4695        down_read(&rbd_dev->header_rwsem);
4696        mapping_size = rbd_dev->mapping.size;
4697        rbd_img_capture_header(img_request);
4698        up_read(&rbd_dev->header_rwsem);
4699
4700        if (offset + length > mapping_size) {
4701                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4702                         length, mapping_size);
4703                result = -EIO;
4704                goto err_img_request;
4705        }
4706
4707        dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4708             img_request, obj_op_name(op_type), offset, length);
4709
4710        if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4711                result = rbd_img_fill_nodata(img_request, offset, length);
4712        else
4713                result = rbd_img_fill_from_bio(img_request, offset, length,
4714                                               rq->bio);
4715        if (result)
4716                goto err_img_request;
4717
4718        rbd_img_handle_request(img_request, 0);
4719        return;
4720
4721err_img_request:
4722        rbd_img_request_destroy(img_request);
4723        if (result)
4724                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4725                         obj_op_name(op_type), length, offset, result);
4726        blk_mq_end_request(rq, errno_to_blk_status(result));
4727}
4728
4729static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4730                const struct blk_mq_queue_data *bd)
4731{
4732        struct rbd_device *rbd_dev = hctx->queue->queuedata;
4733        struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4734        enum obj_operation_type op_type;
4735
4736        switch (req_op(bd->rq)) {
4737        case REQ_OP_DISCARD:
4738                op_type = OBJ_OP_DISCARD;
4739                break;
4740        case REQ_OP_WRITE_ZEROES:
4741                op_type = OBJ_OP_ZEROOUT;
4742                break;
4743        case REQ_OP_WRITE:
4744                op_type = OBJ_OP_WRITE;
4745                break;
4746        case REQ_OP_READ:
4747                op_type = OBJ_OP_READ;
4748                break;
4749        default:
4750                rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4751                return BLK_STS_IOERR;
4752        }
4753
4754        rbd_img_request_init(img_req, rbd_dev, op_type);
4755
4756        if (rbd_img_is_write(img_req)) {
4757                if (rbd_is_ro(rbd_dev)) {
4758                        rbd_warn(rbd_dev, "%s on read-only mapping",
4759                                 obj_op_name(img_req->op_type));
4760                        return BLK_STS_IOERR;
4761                }
4762                rbd_assert(!rbd_is_snap(rbd_dev));
4763        }
4764
4765        INIT_WORK(&img_req->work, rbd_queue_workfn);
4766        queue_work(rbd_wq, &img_req->work);
4767        return BLK_STS_OK;
4768}
4769
4770static void rbd_free_disk(struct rbd_device *rbd_dev)
4771{
4772        blk_cleanup_queue(rbd_dev->disk->queue);
4773        blk_mq_free_tag_set(&rbd_dev->tag_set);
4774        put_disk(rbd_dev->disk);
4775        rbd_dev->disk = NULL;
4776}
4777
4778static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4779                             struct ceph_object_id *oid,
4780                             struct ceph_object_locator *oloc,
4781                             void *buf, int buf_len)
4782
4783{
4784        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4785        struct ceph_osd_request *req;
4786        struct page **pages;
4787        int num_pages = calc_pages_for(0, buf_len);
4788        int ret;
4789
4790        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4791        if (!req)
4792                return -ENOMEM;
4793
4794        ceph_oid_copy(&req->r_base_oid, oid);
4795        ceph_oloc_copy(&req->r_base_oloc, oloc);
4796        req->r_flags = CEPH_OSD_FLAG_READ;
4797
4798        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4799        if (IS_ERR(pages)) {
4800                ret = PTR_ERR(pages);
4801                goto out_req;
4802        }
4803
4804        osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4805        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4806                                         true);
4807
4808        ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4809        if (ret)
4810                goto out_req;
4811
4812        ceph_osdc_start_request(osdc, req, false);
4813        ret = ceph_osdc_wait_request(osdc, req);
4814        if (ret >= 0)
4815                ceph_copy_from_page_vector(pages, buf, 0, ret);
4816
4817out_req:
4818        ceph_osdc_put_request(req);
4819        return ret;
4820}
4821
4822/*
4823 * Read the complete header for the given rbd device.  On successful
4824 * return, the rbd_dev->header field will contain up-to-date
4825 * information about the image.
4826 */
4827static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4828{
4829        struct rbd_image_header_ondisk *ondisk = NULL;
4830        u32 snap_count = 0;
4831        u64 names_size = 0;
4832        u32 want_count;
4833        int ret;
4834
4835        /*
4836         * The complete header will include an array of its 64-bit
4837         * snapshot ids, followed by the names of those snapshots as
4838         * a contiguous block of NUL-terminated strings.  Note that
4839         * the number of snapshots could change by the time we read
4840         * it in, in which case we re-read it.
4841         */
4842        do {
4843                size_t size;
4844
4845                kfree(ondisk);
4846
4847                size = sizeof (*ondisk);
4848                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4849                size += names_size;
4850                ondisk = kmalloc(size, GFP_KERNEL);
4851                if (!ondisk)
4852                        return -ENOMEM;
4853
4854                ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4855                                        &rbd_dev->header_oloc, ondisk, size);
4856                if (ret < 0)
4857                        goto out;
4858                if ((size_t)ret < size) {
4859                        ret = -ENXIO;
4860                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4861                                size, ret);
4862                        goto out;
4863                }
4864                if (!rbd_dev_ondisk_valid(ondisk)) {
4865                        ret = -ENXIO;
4866                        rbd_warn(rbd_dev, "invalid header");
4867                        goto out;
4868                }
4869
4870                names_size = le64_to_cpu(ondisk->snap_names_len);
4871                want_count = snap_count;
4872                snap_count = le32_to_cpu(ondisk->snap_count);
4873        } while (snap_count != want_count);
4874
4875        ret = rbd_header_from_disk(rbd_dev, ondisk);
4876out:
4877        kfree(ondisk);
4878
4879        return ret;
4880}
4881
4882static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4883{
4884        sector_t size;
4885
4886        /*
4887         * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4888         * try to update its size.  If REMOVING is set, updating size
4889         * is just useless work since the device can't be opened.
4890         */
4891        if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4892            !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4893                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4894                dout("setting size to %llu sectors", (unsigned long long)size);
4895                set_capacity_and_notify(rbd_dev->disk, size);
4896        }
4897}
4898
4899static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4900{
4901        u64 mapping_size;
4902        int ret;
4903
4904        down_write(&rbd_dev->header_rwsem);
4905        mapping_size = rbd_dev->mapping.size;
4906
4907        ret = rbd_dev_header_info(rbd_dev);
4908        if (ret)
4909                goto out;
4910
4911        /*
4912         * If there is a parent, see if it has disappeared due to the
4913         * mapped image getting flattened.
4914         */
4915        if (rbd_dev->parent) {
4916                ret = rbd_dev_v2_parent_info(rbd_dev);
4917                if (ret)
4918                        goto out;
4919        }
4920
4921        rbd_assert(!rbd_is_snap(rbd_dev));
4922        rbd_dev->mapping.size = rbd_dev->header.image_size;
4923
4924out:
4925        up_write(&rbd_dev->header_rwsem);
4926        if (!ret && mapping_size != rbd_dev->mapping.size)
4927                rbd_dev_update_size(rbd_dev);
4928
4929        return ret;
4930}
4931
4932static const struct blk_mq_ops rbd_mq_ops = {
4933        .queue_rq       = rbd_queue_rq,
4934};
4935
4936static int rbd_init_disk(struct rbd_device *rbd_dev)
4937{
4938        struct gendisk *disk;
4939        struct request_queue *q;
4940        unsigned int objset_bytes =
4941            rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4942        int err;
4943
4944        /* create gendisk info */
4945        disk = alloc_disk(single_major ?
4946                          (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4947                          RBD_MINORS_PER_MAJOR);
4948        if (!disk)
4949                return -ENOMEM;
4950
4951        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4952                 rbd_dev->dev_id);
4953        disk->major = rbd_dev->major;
4954        disk->first_minor = rbd_dev->minor;
4955        if (single_major)
4956                disk->flags |= GENHD_FL_EXT_DEVT;
4957        disk->fops = &rbd_bd_ops;
4958        disk->private_data = rbd_dev;
4959
4960        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4961        rbd_dev->tag_set.ops = &rbd_mq_ops;
4962        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4963        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4964        rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4965        rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4966        rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4967
4968        err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4969        if (err)
4970                goto out_disk;
4971
4972        q = blk_mq_init_queue(&rbd_dev->tag_set);
4973        if (IS_ERR(q)) {
4974                err = PTR_ERR(q);
4975                goto out_tag_set;
4976        }
4977
4978        blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4979        /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4980
4981        blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4982        q->limits.max_sectors = queue_max_hw_sectors(q);
4983        blk_queue_max_segments(q, USHRT_MAX);
4984        blk_queue_max_segment_size(q, UINT_MAX);
4985        blk_queue_io_min(q, rbd_dev->opts->alloc_size);
4986        blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
4987
4988        if (rbd_dev->opts->trim) {
4989                blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4990                q->limits.discard_granularity = rbd_dev->opts->alloc_size;
4991                blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4992                blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4993        }
4994
4995        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4996                blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
4997
4998        /*
4999         * disk_release() expects a queue ref from add_disk() and will
5000         * put it.  Hold an extra ref until add_disk() is called.
5001         */
5002        WARN_ON(!blk_get_queue(q));
5003        disk->queue = q;
5004        q->queuedata = rbd_dev;
5005
5006        rbd_dev->disk = disk;
5007
5008        return 0;
5009out_tag_set:
5010        blk_mq_free_tag_set(&rbd_dev->tag_set);
5011out_disk:
5012        put_disk(disk);
5013        return err;
5014}
5015
5016/*
5017  sysfs
5018*/
5019
5020static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5021{
5022        return container_of(dev, struct rbd_device, dev);
5023}
5024
5025static ssize_t rbd_size_show(struct device *dev,
5026                             struct device_attribute *attr, char *buf)
5027{
5028        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5029
5030        return sprintf(buf, "%llu\n",
5031                (unsigned long long)rbd_dev->mapping.size);
5032}
5033
5034static ssize_t rbd_features_show(struct device *dev,
5035                             struct device_attribute *attr, char *buf)
5036{
5037        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5038
5039        return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5040}
5041
5042static ssize_t rbd_major_show(struct device *dev,
5043                              struct device_attribute *attr, char *buf)
5044{
5045        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5046
5047        if (rbd_dev->major)
5048                return sprintf(buf, "%d\n", rbd_dev->major);
5049
5050        return sprintf(buf, "(none)\n");
5051}
5052
5053static ssize_t rbd_minor_show(struct device *dev,
5054                              struct device_attribute *attr, char *buf)
5055{
5056        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5057
5058        return sprintf(buf, "%d\n", rbd_dev->minor);
5059}
5060
5061static ssize_t rbd_client_addr_show(struct device *dev,
5062                                    struct device_attribute *attr, char *buf)
5063{
5064        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5065        struct ceph_entity_addr *client_addr =
5066            ceph_client_addr(rbd_dev->rbd_client->client);
5067
5068        return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5069                       le32_to_cpu(client_addr->nonce));
5070}
5071
5072static ssize_t rbd_client_id_show(struct device *dev,
5073                                  struct device_attribute *attr, char *buf)
5074{
5075        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5076
5077        return sprintf(buf, "client%lld\n",
5078                       ceph_client_gid(rbd_dev->rbd_client->client));
5079}
5080
5081static ssize_t rbd_cluster_fsid_show(struct device *dev,
5082                                     struct device_attribute *attr, char *buf)
5083{
5084        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5085
5086        return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5087}
5088
5089static ssize_t rbd_config_info_show(struct device *dev,
5090                                    struct device_attribute *attr, char *buf)
5091{
5092        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5093
5094        if (!capable(CAP_SYS_ADMIN))
5095                return -EPERM;
5096
5097        return sprintf(buf, "%s\n", rbd_dev->config_info);
5098}
5099
5100static ssize_t rbd_pool_show(struct device *dev,
5101                             struct device_attribute *attr, char *buf)
5102{
5103        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5104
5105        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5106}
5107
5108static ssize_t rbd_pool_id_show(struct device *dev,
5109                             struct device_attribute *attr, char *buf)
5110{
5111        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5112
5113        return sprintf(buf, "%llu\n",
5114                        (unsigned long long) rbd_dev->spec->pool_id);
5115}
5116
5117static ssize_t rbd_pool_ns_show(struct device *dev,
5118                                struct device_attribute *attr, char *buf)
5119{
5120        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5121
5122        return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5123}
5124
5125static ssize_t rbd_name_show(struct device *dev,
5126                             struct device_attribute *attr, char *buf)
5127{
5128        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5129
5130        if (rbd_dev->spec->image_name)
5131                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5132
5133        return sprintf(buf, "(unknown)\n");
5134}
5135
5136static ssize_t rbd_image_id_show(struct device *dev,
5137                             struct device_attribute *attr, char *buf)
5138{
5139        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5140
5141        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5142}
5143
5144/*
5145 * Shows the name of the currently-mapped snapshot (or
5146 * RBD_SNAP_HEAD_NAME for the base image).
5147 */
5148static ssize_t rbd_snap_show(struct device *dev,
5149                             struct device_attribute *attr,
5150                             char *buf)
5151{
5152        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5153
5154        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5155}
5156
5157static ssize_t rbd_snap_id_show(struct device *dev,
5158                                struct device_attribute *attr, char *buf)
5159{
5160        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5161
5162        return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5163}
5164
5165/*
5166 * For a v2 image, shows the chain of parent images, separated by empty
5167 * lines.  For v1 images or if there is no parent, shows "(no parent
5168 * image)".
5169 */
5170static ssize_t rbd_parent_show(struct device *dev,
5171                               struct device_attribute *attr,
5172                               char *buf)
5173{
5174        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5175        ssize_t count = 0;
5176
5177        if (!rbd_dev->parent)
5178                return sprintf(buf, "(no parent image)\n");
5179
5180        for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5181                struct rbd_spec *spec = rbd_dev->parent_spec;
5182
5183                count += sprintf(&buf[count], "%s"
5184                            "pool_id %llu\npool_name %s\n"
5185                            "pool_ns %s\n"
5186                            "image_id %s\nimage_name %s\n"
5187                            "snap_id %llu\nsnap_name %s\n"
5188                            "overlap %llu\n",
5189                            !count ? "" : "\n", /* first? */
5190                            spec->pool_id, spec->pool_name,
5191                            spec->pool_ns ?: "",
5192                            spec->image_id, spec->image_name ?: "(unknown)",
5193                            spec->snap_id, spec->snap_name,
5194                            rbd_dev->parent_overlap);
5195        }
5196
5197        return count;
5198}
5199
5200static ssize_t rbd_image_refresh(struct device *dev,
5201                                 struct device_attribute *attr,
5202                                 const char *buf,
5203                                 size_t size)
5204{
5205        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5206        int ret;
5207
5208        if (!capable(CAP_SYS_ADMIN))
5209                return -EPERM;
5210
5211        ret = rbd_dev_refresh(rbd_dev);
5212        if (ret)
5213                return ret;
5214
5215        return size;
5216}
5217
5218static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5219static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5220static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5221static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5222static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5223static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5224static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5225static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5226static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5227static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5228static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5229static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5230static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5231static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5232static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5233static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5234static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5235
5236static struct attribute *rbd_attrs[] = {
5237        &dev_attr_size.attr,
5238        &dev_attr_features.attr,
5239        &dev_attr_major.attr,
5240        &dev_attr_minor.attr,
5241        &dev_attr_client_addr.attr,
5242        &dev_attr_client_id.attr,
5243        &dev_attr_cluster_fsid.attr,
5244        &dev_attr_config_info.attr,
5245        &dev_attr_pool.attr,
5246        &dev_attr_pool_id.attr,
5247        &dev_attr_pool_ns.attr,
5248        &dev_attr_name.attr,
5249        &dev_attr_image_id.attr,
5250        &dev_attr_current_snap.attr,
5251        &dev_attr_snap_id.attr,
5252        &dev_attr_parent.attr,
5253        &dev_attr_refresh.attr,
5254        NULL
5255};
5256
5257static struct attribute_group rbd_attr_group = {
5258        .attrs = rbd_attrs,
5259};
5260
5261static const struct attribute_group *rbd_attr_groups[] = {
5262        &rbd_attr_group,
5263        NULL
5264};
5265
5266static void rbd_dev_release(struct device *dev);
5267
5268static const struct device_type rbd_device_type = {
5269        .name           = "rbd",
5270        .groups         = rbd_attr_groups,
5271        .release        = rbd_dev_release,
5272};
5273
5274static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5275{
5276        kref_get(&spec->kref);
5277
5278        return spec;
5279}
5280
5281static void rbd_spec_free(struct kref *kref);
5282static void rbd_spec_put(struct rbd_spec *spec)
5283{
5284        if (spec)
5285                kref_put(&spec->kref, rbd_spec_free);
5286}
5287
5288static struct rbd_spec *rbd_spec_alloc(void)
5289{
5290        struct rbd_spec *spec;
5291
5292        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5293        if (!spec)
5294                return NULL;
5295
5296        spec->pool_id = CEPH_NOPOOL;
5297        spec->snap_id = CEPH_NOSNAP;
5298        kref_init(&spec->kref);
5299
5300        return spec;
5301}
5302
5303static void rbd_spec_free(struct kref *kref)
5304{
5305        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5306
5307        kfree(spec->pool_name);
5308        kfree(spec->pool_ns);
5309        kfree(spec->image_id);
5310        kfree(spec->image_name);
5311        kfree(spec->snap_name);
5312        kfree(spec);
5313}
5314
5315static void rbd_dev_free(struct rbd_device *rbd_dev)
5316{
5317        WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5318        WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5319
5320        ceph_oid_destroy(&rbd_dev->header_oid);
5321        ceph_oloc_destroy(&rbd_dev->header_oloc);
5322        kfree(rbd_dev->config_info);
5323
5324        rbd_put_client(rbd_dev->rbd_client);
5325        rbd_spec_put(rbd_dev->spec);
5326        kfree(rbd_dev->opts);
5327        kfree(rbd_dev);
5328}
5329
5330static void rbd_dev_release(struct device *dev)
5331{
5332        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5333        bool need_put = !!rbd_dev->opts;
5334
5335        if (need_put) {
5336                destroy_workqueue(rbd_dev->task_wq);
5337                ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5338        }
5339
5340        rbd_dev_free(rbd_dev);
5341
5342        /*
5343         * This is racy, but way better than putting module outside of
5344         * the release callback.  The race window is pretty small, so
5345         * doing something similar to dm (dm-builtin.c) is overkill.
5346         */
5347        if (need_put)
5348                module_put(THIS_MODULE);
5349}
5350
5351static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5352                                           struct rbd_spec *spec)
5353{
5354        struct rbd_device *rbd_dev;
5355
5356        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5357        if (!rbd_dev)
5358                return NULL;
5359
5360        spin_lock_init(&rbd_dev->lock);
5361        INIT_LIST_HEAD(&rbd_dev->node);
5362        init_rwsem(&rbd_dev->header_rwsem);
5363
5364        rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5365        ceph_oid_init(&rbd_dev->header_oid);
5366        rbd_dev->header_oloc.pool = spec->pool_id;
5367        if (spec->pool_ns) {
5368                WARN_ON(!*spec->pool_ns);
5369                rbd_dev->header_oloc.pool_ns =
5370                    ceph_find_or_create_string(spec->pool_ns,
5371                                               strlen(spec->pool_ns));
5372        }
5373
5374        mutex_init(&rbd_dev->watch_mutex);
5375        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5376        INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5377
5378        init_rwsem(&rbd_dev->lock_rwsem);
5379        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5380        INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5381        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5382        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5383        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5384        spin_lock_init(&rbd_dev->lock_lists_lock);
5385        INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5386        INIT_LIST_HEAD(&rbd_dev->running_list);
5387        init_completion(&rbd_dev->acquire_wait);
5388        init_completion(&rbd_dev->releasing_wait);
5389
5390        spin_lock_init(&rbd_dev->object_map_lock);
5391
5392        rbd_dev->dev.bus = &rbd_bus_type;
5393        rbd_dev->dev.type = &rbd_device_type;
5394        rbd_dev->dev.parent = &rbd_root_dev;
5395        device_initialize(&rbd_dev->dev);
5396
5397        rbd_dev->rbd_client = rbdc;
5398        rbd_dev->spec = spec;
5399
5400        return rbd_dev;
5401}
5402
5403/*
5404 * Create a mapping rbd_dev.
5405 */
5406static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5407                                         struct rbd_spec *spec,
5408                                         struct rbd_options *opts)
5409{
5410        struct rbd_device *rbd_dev;
5411
5412        rbd_dev = __rbd_dev_create(rbdc, spec);
5413        if (!rbd_dev)
5414                return NULL;
5415
5416        rbd_dev->opts = opts;
5417
5418        /* get an id and fill in device name */
5419        rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5420                                         minor_to_rbd_dev_id(1 << MINORBITS),
5421                                         GFP_KERNEL);
5422        if (rbd_dev->dev_id < 0)
5423                goto fail_rbd_dev;
5424
5425        sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5426        rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5427                                                   rbd_dev->name);
5428        if (!rbd_dev->task_wq)
5429                goto fail_dev_id;
5430
5431        /* we have a ref from do_rbd_add() */
5432        __module_get(THIS_MODULE);
5433
5434        dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5435        return rbd_dev;
5436
5437fail_dev_id:
5438        ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5439fail_rbd_dev:
5440        rbd_dev_free(rbd_dev);
5441        return NULL;
5442}
5443
5444static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5445{
5446        if (rbd_dev)
5447                put_device(&rbd_dev->dev);
5448}
5449
5450/*
5451 * Get the size and object order for an image snapshot, or if
5452 * snap_id is CEPH_NOSNAP, gets this information for the base
5453 * image.
5454 */
5455static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5456                                u8 *order, u64 *snap_size)
5457{
5458        __le64 snapid = cpu_to_le64(snap_id);
5459        int ret;
5460        struct {
5461                u8 order;
5462                __le64 size;
5463        } __attribute__ ((packed)) size_buf = { 0 };
5464
5465        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5466                                  &rbd_dev->header_oloc, "get_size",
5467                                  &snapid, sizeof(snapid),
5468                                  &size_buf, sizeof(size_buf));
5469        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5470        if (ret < 0)
5471                return ret;
5472        if (ret < sizeof (size_buf))
5473                return -ERANGE;
5474
5475        if (order) {
5476                *order = size_buf.order;
5477                dout("  order %u", (unsigned int)*order);
5478        }
5479        *snap_size = le64_to_cpu(size_buf.size);
5480
5481        dout("  snap_id 0x%016llx snap_size = %llu\n",
5482                (unsigned long long)snap_id,
5483                (unsigned long long)*snap_size);
5484
5485        return 0;
5486}
5487
5488static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5489{
5490        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5491                                        &rbd_dev->header.obj_order,
5492                                        &rbd_dev->header.image_size);
5493}
5494
5495static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5496{
5497        size_t size;
5498        void *reply_buf;
5499        int ret;
5500        void *p;
5501
5502        /* Response will be an encoded string, which includes a length */
5503        size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5504        reply_buf = kzalloc(size, GFP_KERNEL);
5505        if (!reply_buf)
5506                return -ENOMEM;
5507
5508        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5509                                  &rbd_dev->header_oloc, "get_object_prefix",
5510                                  NULL, 0, reply_buf, size);
5511        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5512        if (ret < 0)
5513                goto out;
5514
5515        p = reply_buf;
5516        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5517                                                p + ret, NULL, GFP_NOIO);
5518        ret = 0;
5519
5520        if (IS_ERR(rbd_dev->header.object_prefix)) {
5521                ret = PTR_ERR(rbd_dev->header.object_prefix);
5522                rbd_dev->header.object_prefix = NULL;
5523        } else {
5524                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5525        }
5526out:
5527        kfree(reply_buf);
5528
5529        return ret;
5530}
5531
5532static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5533                                     bool read_only, u64 *snap_features)
5534{
5535        struct {
5536                __le64 snap_id;
5537                u8 read_only;
5538        } features_in;
5539        struct {
5540                __le64 features;
5541                __le64 incompat;
5542        } __attribute__ ((packed)) features_buf = { 0 };
5543        u64 unsup;
5544        int ret;
5545
5546        features_in.snap_id = cpu_to_le64(snap_id);
5547        features_in.read_only = read_only;
5548
5549        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5550                                  &rbd_dev->header_oloc, "get_features",
5551                                  &features_in, sizeof(features_in),
5552                                  &features_buf, sizeof(features_buf));
5553        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5554        if (ret < 0)
5555                return ret;
5556        if (ret < sizeof (features_buf))
5557                return -ERANGE;
5558
5559        unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5560        if (unsup) {
5561                rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5562                         unsup);
5563                return -ENXIO;
5564        }
5565
5566        *snap_features = le64_to_cpu(features_buf.features);
5567
5568        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5569                (unsigned long long)snap_id,
5570                (unsigned long long)*snap_features,
5571                (unsigned long long)le64_to_cpu(features_buf.incompat));
5572
5573        return 0;
5574}
5575
5576static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5577{
5578        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5579                                         rbd_is_ro(rbd_dev),
5580                                         &rbd_dev->header.features);
5581}
5582
5583/*
5584 * These are generic image flags, but since they are used only for
5585 * object map, store them in rbd_dev->object_map_flags.
5586 *
5587 * For the same reason, this function is called only on object map
5588 * (re)load and not on header refresh.
5589 */
5590static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5591{
5592        __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5593        __le64 flags;
5594        int ret;
5595
5596        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5597                                  &rbd_dev->header_oloc, "get_flags",
5598                                  &snapid, sizeof(snapid),
5599                                  &flags, sizeof(flags));
5600        if (ret < 0)
5601                return ret;
5602        if (ret < sizeof(flags))
5603                return -EBADMSG;
5604
5605        rbd_dev->object_map_flags = le64_to_cpu(flags);
5606        return 0;
5607}
5608
5609struct parent_image_info {
5610        u64             pool_id;
5611        const char      *pool_ns;
5612        const char      *image_id;
5613        u64             snap_id;
5614
5615        bool            has_overlap;
5616        u64             overlap;
5617};
5618
5619/*
5620 * The caller is responsible for @pii.
5621 */
5622static int decode_parent_image_spec(void **p, void *end,
5623                                    struct parent_image_info *pii)
5624{
5625        u8 struct_v;
5626        u32 struct_len;
5627        int ret;
5628
5629        ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5630                                  &struct_v, &struct_len);
5631        if (ret)
5632                return ret;
5633
5634        ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5635        pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5636        if (IS_ERR(pii->pool_ns)) {
5637                ret = PTR_ERR(pii->pool_ns);
5638                pii->pool_ns = NULL;
5639                return ret;
5640        }
5641        pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5642        if (IS_ERR(pii->image_id)) {
5643                ret = PTR_ERR(pii->image_id);
5644                pii->image_id = NULL;
5645                return ret;
5646        }
5647        ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5648        return 0;
5649
5650e_inval:
5651        return -EINVAL;
5652}
5653
5654static int __get_parent_info(struct rbd_device *rbd_dev,
5655                             struct page *req_page,
5656                             struct page *reply_page,
5657                             struct parent_image_info *pii)
5658{
5659        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5660        size_t reply_len = PAGE_SIZE;
5661        void *p, *end;
5662        int ret;
5663
5664        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5665                             "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5666                             req_page, sizeof(u64), &reply_page, &reply_len);
5667        if (ret)
5668                return ret == -EOPNOTSUPP ? 1 : ret;
5669
5670        p = page_address(reply_page);
5671        end = p + reply_len;
5672        ret = decode_parent_image_spec(&p, end, pii);
5673        if (ret)
5674                return ret;
5675
5676        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5677                             "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5678                             req_page, sizeof(u64), &reply_page, &reply_len);
5679        if (ret)
5680                return ret;
5681
5682        p = page_address(reply_page);
5683        end = p + reply_len;
5684        ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5685        if (pii->has_overlap)
5686                ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5687
5688        return 0;
5689
5690e_inval:
5691        return -EINVAL;
5692}
5693
5694/*
5695 * The caller is responsible for @pii.
5696 */
5697static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5698                                    struct page *req_page,
5699                                    struct page *reply_page,
5700                                    struct parent_image_info *pii)
5701{
5702        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5703        size_t reply_len = PAGE_SIZE;
5704        void *p, *end;
5705        int ret;
5706
5707        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5708                             "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5709                             req_page, sizeof(u64), &reply_page, &reply_len);
5710        if (ret)
5711                return ret;
5712
5713        p = page_address(reply_page);
5714        end = p + reply_len;
5715        ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5716        pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5717        if (IS_ERR(pii->image_id)) {
5718                ret = PTR_ERR(pii->image_id);
5719                pii->image_id = NULL;
5720                return ret;
5721        }
5722        ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5723        pii->has_overlap = true;
5724        ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5725
5726        return 0;
5727
5728e_inval:
5729        return -EINVAL;
5730}
5731
5732static int get_parent_info(struct rbd_device *rbd_dev,
5733                           struct parent_image_info *pii)
5734{
5735        struct page *req_page, *reply_page;
5736        void *p;
5737        int ret;
5738
5739        req_page = alloc_page(GFP_KERNEL);
5740        if (!req_page)
5741                return -ENOMEM;
5742
5743        reply_page = alloc_page(GFP_KERNEL);
5744        if (!reply_page) {
5745                __free_page(req_page);
5746                return -ENOMEM;
5747        }
5748
5749        p = page_address(req_page);
5750        ceph_encode_64(&p, rbd_dev->spec->snap_id);
5751        ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5752        if (ret > 0)
5753                ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5754                                               pii);
5755
5756        __free_page(req_page);
5757        __free_page(reply_page);
5758        return ret;
5759}
5760
5761static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5762{
5763        struct rbd_spec *parent_spec;
5764        struct parent_image_info pii = { 0 };
5765        int ret;
5766
5767        parent_spec = rbd_spec_alloc();
5768        if (!parent_spec)
5769                return -ENOMEM;
5770
5771        ret = get_parent_info(rbd_dev, &pii);
5772        if (ret)
5773                goto out_err;
5774
5775        dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5776             __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5777             pii.has_overlap, pii.overlap);
5778
5779        if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5780                /*
5781                 * Either the parent never existed, or we have
5782                 * record of it but the image got flattened so it no
5783                 * longer has a parent.  When the parent of a
5784                 * layered image disappears we immediately set the
5785                 * overlap to 0.  The effect of this is that all new
5786                 * requests will be treated as if the image had no
5787                 * parent.
5788                 *
5789                 * If !pii.has_overlap, the parent image spec is not
5790                 * applicable.  It's there to avoid duplication in each
5791                 * snapshot record.
5792                 */
5793                if (rbd_dev->parent_overlap) {
5794                        rbd_dev->parent_overlap = 0;
5795                        rbd_dev_parent_put(rbd_dev);
5796                        pr_info("%s: clone image has been flattened\n",
5797                                rbd_dev->disk->disk_name);
5798                }
5799
5800                goto out;       /* No parent?  No problem. */
5801        }
5802
5803        /* The ceph file layout needs to fit pool id in 32 bits */
5804
5805        ret = -EIO;
5806        if (pii.pool_id > (u64)U32_MAX) {
5807                rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5808                        (unsigned long long)pii.pool_id, U32_MAX);
5809                goto out_err;
5810        }
5811
5812        /*
5813         * The parent won't change (except when the clone is
5814         * flattened, already handled that).  So we only need to
5815         * record the parent spec we have not already done so.
5816         */
5817        if (!rbd_dev->parent_spec) {
5818                parent_spec->pool_id = pii.pool_id;
5819                if (pii.pool_ns && *pii.pool_ns) {
5820                        parent_spec->pool_ns = pii.pool_ns;
5821                        pii.pool_ns = NULL;
5822                }
5823                parent_spec->image_id = pii.image_id;
5824                pii.image_id = NULL;
5825                parent_spec->snap_id = pii.snap_id;
5826
5827                rbd_dev->parent_spec = parent_spec;
5828                parent_spec = NULL;     /* rbd_dev now owns this */
5829        }
5830
5831        /*
5832         * We always update the parent overlap.  If it's zero we issue
5833         * a warning, as we will proceed as if there was no parent.
5834         */
5835        if (!pii.overlap) {
5836                if (parent_spec) {
5837                        /* refresh, careful to warn just once */
5838                        if (rbd_dev->parent_overlap)
5839                                rbd_warn(rbd_dev,
5840                                    "clone now standalone (overlap became 0)");
5841                } else {
5842                        /* initial probe */
5843                        rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5844                }
5845        }
5846        rbd_dev->parent_overlap = pii.overlap;
5847
5848out:
5849        ret = 0;
5850out_err:
5851        kfree(pii.pool_ns);
5852        kfree(pii.image_id);
5853        rbd_spec_put(parent_spec);
5854        return ret;
5855}
5856
5857static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5858{
5859        struct {
5860                __le64 stripe_unit;
5861                __le64 stripe_count;
5862        } __attribute__ ((packed)) striping_info_buf = { 0 };
5863        size_t size = sizeof (striping_info_buf);
5864        void *p;
5865        int ret;
5866
5867        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5868                                &rbd_dev->header_oloc, "get_stripe_unit_count",
5869                                NULL, 0, &striping_info_buf, size);
5870        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5871        if (ret < 0)
5872                return ret;
5873        if (ret < size)
5874                return -ERANGE;
5875
5876        p = &striping_info_buf;
5877        rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5878        rbd_dev->header.stripe_count = ceph_decode_64(&p);
5879        return 0;
5880}
5881
5882static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5883{
5884        __le64 data_pool_id;
5885        int ret;
5886
5887        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5888                                  &rbd_dev->header_oloc, "get_data_pool",
5889                                  NULL, 0, &data_pool_id, sizeof(data_pool_id));
5890        if (ret < 0)
5891                return ret;
5892        if (ret < sizeof(data_pool_id))
5893                return -EBADMSG;
5894
5895        rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5896        WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5897        return 0;
5898}
5899
5900static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5901{
5902        CEPH_DEFINE_OID_ONSTACK(oid);
5903        size_t image_id_size;
5904        char *image_id;
5905        void *p;
5906        void *end;
5907        size_t size;
5908        void *reply_buf = NULL;
5909        size_t len = 0;
5910        char *image_name = NULL;
5911        int ret;
5912
5913        rbd_assert(!rbd_dev->spec->image_name);
5914
5915        len = strlen(rbd_dev->spec->image_id);
5916        image_id_size = sizeof (__le32) + len;
5917        image_id = kmalloc(image_id_size, GFP_KERNEL);
5918        if (!image_id)
5919                return NULL;
5920
5921        p = image_id;
5922        end = image_id + image_id_size;
5923        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5924
5925        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5926        reply_buf = kmalloc(size, GFP_KERNEL);
5927        if (!reply_buf)
5928                goto out;
5929
5930        ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5931        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5932                                  "dir_get_name", image_id, image_id_size,
5933                                  reply_buf, size);
5934        if (ret < 0)
5935                goto out;
5936        p = reply_buf;
5937        end = reply_buf + ret;
5938
5939        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5940        if (IS_ERR(image_name))
5941                image_name = NULL;
5942        else
5943                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5944out:
5945        kfree(reply_buf);
5946        kfree(image_id);
5947
5948        return image_name;
5949}
5950
5951static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5952{
5953        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5954        const char *snap_name;
5955        u32 which = 0;
5956
5957        /* Skip over names until we find the one we are looking for */
5958
5959        snap_name = rbd_dev->header.snap_names;
5960        while (which < snapc->num_snaps) {
5961                if (!strcmp(name, snap_name))
5962                        return snapc->snaps[which];
5963                snap_name += strlen(snap_name) + 1;
5964                which++;
5965        }
5966        return CEPH_NOSNAP;
5967}
5968
5969static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5970{
5971        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5972        u32 which;
5973        bool found = false;
5974        u64 snap_id;
5975
5976        for (which = 0; !found && which < snapc->num_snaps; which++) {
5977                const char *snap_name;
5978
5979                snap_id = snapc->snaps[which];
5980                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5981                if (IS_ERR(snap_name)) {
5982                        /* ignore no-longer existing snapshots */
5983                        if (PTR_ERR(snap_name) == -ENOENT)
5984                                continue;
5985                        else
5986                                break;
5987                }
5988                found = !strcmp(name, snap_name);
5989                kfree(snap_name);
5990        }
5991        return found ? snap_id : CEPH_NOSNAP;
5992}
5993
5994/*
5995 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5996 * no snapshot by that name is found, or if an error occurs.
5997 */
5998static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5999{
6000        if (rbd_dev->image_format == 1)
6001                return rbd_v1_snap_id_by_name(rbd_dev, name);
6002
6003        return rbd_v2_snap_id_by_name(rbd_dev, name);
6004}
6005
6006/*
6007 * An image being mapped will have everything but the snap id.
6008 */
6009static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6010{
6011        struct rbd_spec *spec = rbd_dev->spec;
6012
6013        rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6014        rbd_assert(spec->image_id && spec->image_name);
6015        rbd_assert(spec->snap_name);
6016
6017        if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6018                u64 snap_id;
6019
6020                snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6021                if (snap_id == CEPH_NOSNAP)
6022                        return -ENOENT;
6023
6024                spec->snap_id = snap_id;
6025        } else {
6026                spec->snap_id = CEPH_NOSNAP;
6027        }
6028
6029        return 0;
6030}
6031
6032/*
6033 * A parent image will have all ids but none of the names.
6034 *
6035 * All names in an rbd spec are dynamically allocated.  It's OK if we
6036 * can't figure out the name for an image id.
6037 */
6038static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6039{
6040        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6041        struct rbd_spec *spec = rbd_dev->spec;
6042        const char *pool_name;
6043        const char *image_name;
6044        const char *snap_name;
6045        int ret;
6046
6047        rbd_assert(spec->pool_id != CEPH_NOPOOL);
6048        rbd_assert(spec->image_id);
6049        rbd_assert(spec->snap_id != CEPH_NOSNAP);
6050
6051        /* Get the pool name; we have to make our own copy of this */
6052
6053        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6054        if (!pool_name) {
6055                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6056                return -EIO;
6057        }
6058        pool_name = kstrdup(pool_name, GFP_KERNEL);
6059        if (!pool_name)
6060                return -ENOMEM;
6061
6062        /* Fetch the image name; tolerate failure here */
6063
6064        image_name = rbd_dev_image_name(rbd_dev);
6065        if (!image_name)
6066                rbd_warn(rbd_dev, "unable to get image name");
6067
6068        /* Fetch the snapshot name */
6069
6070        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6071        if (IS_ERR(snap_name)) {
6072                ret = PTR_ERR(snap_name);
6073                goto out_err;
6074        }
6075
6076        spec->pool_name = pool_name;
6077        spec->image_name = image_name;
6078        spec->snap_name = snap_name;
6079
6080        return 0;
6081
6082out_err:
6083        kfree(image_name);
6084        kfree(pool_name);
6085        return ret;
6086}
6087
6088static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6089{
6090        size_t size;
6091        int ret;
6092        void *reply_buf;
6093        void *p;
6094        void *end;
6095        u64 seq;
6096        u32 snap_count;
6097        struct ceph_snap_context *snapc;
6098        u32 i;
6099
6100        /*
6101         * We'll need room for the seq value (maximum snapshot id),
6102         * snapshot count, and array of that many snapshot ids.
6103         * For now we have a fixed upper limit on the number we're
6104         * prepared to receive.
6105         */
6106        size = sizeof (__le64) + sizeof (__le32) +
6107                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
6108        reply_buf = kzalloc(size, GFP_KERNEL);
6109        if (!reply_buf)
6110                return -ENOMEM;
6111
6112        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6113                                  &rbd_dev->header_oloc, "get_snapcontext",
6114                                  NULL, 0, reply_buf, size);
6115        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6116        if (ret < 0)
6117                goto out;
6118
6119        p = reply_buf;
6120        end = reply_buf + ret;
6121        ret = -ERANGE;
6122        ceph_decode_64_safe(&p, end, seq, out);
6123        ceph_decode_32_safe(&p, end, snap_count, out);
6124
6125        /*
6126         * Make sure the reported number of snapshot ids wouldn't go
6127         * beyond the end of our buffer.  But before checking that,
6128         * make sure the computed size of the snapshot context we
6129         * allocate is representable in a size_t.
6130         */
6131        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6132                                 / sizeof (u64)) {
6133                ret = -EINVAL;
6134                goto out;
6135        }
6136        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6137                goto out;
6138        ret = 0;
6139
6140        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6141        if (!snapc) {
6142                ret = -ENOMEM;
6143                goto out;
6144        }
6145        snapc->seq = seq;
6146        for (i = 0; i < snap_count; i++)
6147                snapc->snaps[i] = ceph_decode_64(&p);
6148
6149        ceph_put_snap_context(rbd_dev->header.snapc);
6150        rbd_dev->header.snapc = snapc;
6151
6152        dout("  snap context seq = %llu, snap_count = %u\n",
6153                (unsigned long long)seq, (unsigned int)snap_count);
6154out:
6155        kfree(reply_buf);
6156
6157        return ret;
6158}
6159
6160static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6161                                        u64 snap_id)
6162{
6163        size_t size;
6164        void *reply_buf;
6165        __le64 snapid;
6166        int ret;
6167        void *p;
6168        void *end;
6169        char *snap_name;
6170
6171        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6172        reply_buf = kmalloc(size, GFP_KERNEL);
6173        if (!reply_buf)
6174                return ERR_PTR(-ENOMEM);
6175
6176        snapid = cpu_to_le64(snap_id);
6177        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6178                                  &rbd_dev->header_oloc, "get_snapshot_name",
6179                                  &snapid, sizeof(snapid), reply_buf, size);
6180        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6181        if (ret < 0) {
6182                snap_name = ERR_PTR(ret);
6183                goto out;
6184        }
6185
6186        p = reply_buf;
6187        end = reply_buf + ret;
6188        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6189        if (IS_ERR(snap_name))
6190                goto out;
6191
6192        dout("  snap_id 0x%016llx snap_name = %s\n",
6193                (unsigned long long)snap_id, snap_name);
6194out:
6195        kfree(reply_buf);
6196
6197        return snap_name;
6198}
6199
6200static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6201{
6202        bool first_time = rbd_dev->header.object_prefix == NULL;
6203        int ret;
6204
6205        ret = rbd_dev_v2_image_size(rbd_dev);
6206        if (ret)
6207                return ret;
6208
6209        if (first_time) {
6210                ret = rbd_dev_v2_header_onetime(rbd_dev);
6211                if (ret)
6212                        return ret;
6213        }
6214
6215        ret = rbd_dev_v2_snap_context(rbd_dev);
6216        if (ret && first_time) {
6217                kfree(rbd_dev->header.object_prefix);
6218                rbd_dev->header.object_prefix = NULL;
6219        }
6220
6221        return ret;
6222}
6223
6224static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6225{
6226        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6227
6228        if (rbd_dev->image_format == 1)
6229                return rbd_dev_v1_header_info(rbd_dev);
6230
6231        return rbd_dev_v2_header_info(rbd_dev);
6232}
6233
6234/*
6235 * Skips over white space at *buf, and updates *buf to point to the
6236 * first found non-space character (if any). Returns the length of
6237 * the token (string of non-white space characters) found.  Note
6238 * that *buf must be terminated with '\0'.
6239 */
6240static inline size_t next_token(const char **buf)
6241{
6242        /*
6243        * These are the characters that produce nonzero for
6244        * isspace() in the "C" and "POSIX" locales.
6245        */
6246        const char *spaces = " \f\n\r\t\v";
6247
6248        *buf += strspn(*buf, spaces);   /* Find start of token */
6249
6250        return strcspn(*buf, spaces);   /* Return token length */
6251}
6252
6253/*
6254 * Finds the next token in *buf, dynamically allocates a buffer big
6255 * enough to hold a copy of it, and copies the token into the new
6256 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6257 * that a duplicate buffer is created even for a zero-length token.
6258 *
6259 * Returns a pointer to the newly-allocated duplicate, or a null
6260 * pointer if memory for the duplicate was not available.  If
6261 * the lenp argument is a non-null pointer, the length of the token
6262 * (not including the '\0') is returned in *lenp.
6263 *
6264 * If successful, the *buf pointer will be updated to point beyond
6265 * the end of the found token.
6266 *
6267 * Note: uses GFP_KERNEL for allocation.
6268 */
6269static inline char *dup_token(const char **buf, size_t *lenp)
6270{
6271        char *dup;
6272        size_t len;
6273
6274        len = next_token(buf);
6275        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6276        if (!dup)
6277                return NULL;
6278        *(dup + len) = '\0';
6279        *buf += len;
6280
6281        if (lenp)
6282                *lenp = len;
6283
6284        return dup;
6285}
6286
6287static int rbd_parse_param(struct fs_parameter *param,
6288                            struct rbd_parse_opts_ctx *pctx)
6289{
6290        struct rbd_options *opt = pctx->opts;
6291        struct fs_parse_result result;
6292        struct p_log log = {.prefix = "rbd"};
6293        int token, ret;
6294
6295        ret = ceph_parse_param(param, pctx->copts, NULL);
6296        if (ret != -ENOPARAM)
6297                return ret;
6298
6299        token = __fs_parse(&log, rbd_parameters, param, &result);
6300        dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6301        if (token < 0) {
6302                if (token == -ENOPARAM)
6303                        return inval_plog(&log, "Unknown parameter '%s'",
6304                                          param->key);
6305                return token;
6306        }
6307
6308        switch (token) {
6309        case Opt_queue_depth:
6310                if (result.uint_32 < 1)
6311                        goto out_of_range;
6312                opt->queue_depth = result.uint_32;
6313                break;
6314        case Opt_alloc_size:
6315                if (result.uint_32 < SECTOR_SIZE)
6316                        goto out_of_range;
6317                if (!is_power_of_2(result.uint_32))
6318                        return inval_plog(&log, "alloc_size must be a power of 2");
6319                opt->alloc_size = result.uint_32;
6320                break;
6321        case Opt_lock_timeout:
6322                /* 0 is "wait forever" (i.e. infinite timeout) */
6323                if (result.uint_32 > INT_MAX / 1000)
6324                        goto out_of_range;
6325                opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6326                break;
6327        case Opt_pool_ns:
6328                kfree(pctx->spec->pool_ns);
6329                pctx->spec->pool_ns = param->string;
6330                param->string = NULL;
6331                break;
6332        case Opt_compression_hint:
6333                switch (result.uint_32) {
6334                case Opt_compression_hint_none:
6335                        opt->alloc_hint_flags &=
6336                            ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6337                              CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6338                        break;
6339                case Opt_compression_hint_compressible:
6340                        opt->alloc_hint_flags |=
6341                            CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6342                        opt->alloc_hint_flags &=
6343                            ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6344                        break;
6345                case Opt_compression_hint_incompressible:
6346                        opt->alloc_hint_flags |=
6347                            CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6348                        opt->alloc_hint_flags &=
6349                            ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6350                        break;
6351                default:
6352                        BUG();
6353                }
6354                break;
6355        case Opt_read_only:
6356                opt->read_only = true;
6357                break;
6358        case Opt_read_write:
6359                opt->read_only = false;
6360                break;
6361        case Opt_lock_on_read:
6362                opt->lock_on_read = true;
6363                break;
6364        case Opt_exclusive:
6365                opt->exclusive = true;
6366                break;
6367        case Opt_notrim:
6368                opt->trim = false;
6369                break;
6370        default:
6371                BUG();
6372        }
6373
6374        return 0;
6375
6376out_of_range:
6377        return inval_plog(&log, "%s out of range", param->key);
6378}
6379
6380/*
6381 * This duplicates most of generic_parse_monolithic(), untying it from
6382 * fs_context and skipping standard superblock and security options.
6383 */
6384static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6385{
6386        char *key;
6387        int ret = 0;
6388
6389        dout("%s '%s'\n", __func__, options);
6390        while ((key = strsep(&options, ",")) != NULL) {
6391                if (*key) {
6392                        struct fs_parameter param = {
6393                                .key    = key,
6394                                .type   = fs_value_is_flag,
6395                        };
6396                        char *value = strchr(key, '=');
6397                        size_t v_len = 0;
6398
6399                        if (value) {
6400                                if (value == key)
6401                                        continue;
6402                                *value++ = 0;
6403                                v_len = strlen(value);
6404                                param.string = kmemdup_nul(value, v_len,
6405                                                           GFP_KERNEL);
6406                                if (!param.string)
6407                                        return -ENOMEM;
6408                                param.type = fs_value_is_string;
6409                        }
6410                        param.size = v_len;
6411
6412                        ret = rbd_parse_param(&param, pctx);
6413                        kfree(param.string);
6414                        if (ret)
6415                                break;
6416                }
6417        }
6418
6419        return ret;
6420}
6421
6422/*
6423 * Parse the options provided for an "rbd add" (i.e., rbd image
6424 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6425 * and the data written is passed here via a NUL-terminated buffer.
6426 * Returns 0 if successful or an error code otherwise.
6427 *
6428 * The information extracted from these options is recorded in
6429 * the other parameters which return dynamically-allocated
6430 * structures:
6431 *  ceph_opts
6432 *      The address of a pointer that will refer to a ceph options
6433 *      structure.  Caller must release the returned pointer using
6434 *      ceph_destroy_options() when it is no longer needed.
6435 *  rbd_opts
6436 *      Address of an rbd options pointer.  Fully initialized by
6437 *      this function; caller must release with kfree().
6438 *  spec
6439 *      Address of an rbd image specification pointer.  Fully
6440 *      initialized by this function based on parsed options.
6441 *      Caller must release with rbd_spec_put().
6442 *
6443 * The options passed take this form:
6444 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6445 * where:
6446 *  <mon_addrs>
6447 *      A comma-separated list of one or more monitor addresses.
6448 *      A monitor address is an ip address, optionally followed
6449 *      by a port number (separated by a colon).
6450 *        I.e.:  ip1[:port1][,ip2[:port2]...]
6451 *  <options>
6452 *      A comma-separated list of ceph and/or rbd options.
6453 *  <pool_name>
6454 *      The name of the rados pool containing the rbd image.
6455 *  <image_name>
6456 *      The name of the image in that pool to map.
6457 *  <snap_id>
6458 *      An optional snapshot id.  If provided, the mapping will
6459 *      present data from the image at the time that snapshot was
6460 *      created.  The image head is used if no snapshot id is
6461 *      provided.  Snapshot mappings are always read-only.
6462 */
6463static int rbd_add_parse_args(const char *buf,
6464                                struct ceph_options **ceph_opts,
6465                                struct rbd_options **opts,
6466                                struct rbd_spec **rbd_spec)
6467{
6468        size_t len;
6469        char *options;
6470        const char *mon_addrs;
6471        char *snap_name;
6472        size_t mon_addrs_size;
6473        struct rbd_parse_opts_ctx pctx = { 0 };
6474        int ret;
6475
6476        /* The first four tokens are required */
6477
6478        len = next_token(&buf);
6479        if (!len) {
6480                rbd_warn(NULL, "no monitor address(es) provided");
6481                return -EINVAL;
6482        }
6483        mon_addrs = buf;
6484        mon_addrs_size = len;
6485        buf += len;
6486
6487        ret = -EINVAL;
6488        options = dup_token(&buf, NULL);
6489        if (!options)
6490                return -ENOMEM;
6491        if (!*options) {
6492                rbd_warn(NULL, "no options provided");
6493                goto out_err;
6494        }
6495
6496        pctx.spec = rbd_spec_alloc();
6497        if (!pctx.spec)
6498                goto out_mem;
6499
6500        pctx.spec->pool_name = dup_token(&buf, NULL);
6501        if (!pctx.spec->pool_name)
6502                goto out_mem;
6503        if (!*pctx.spec->pool_name) {
6504                rbd_warn(NULL, "no pool name provided");
6505                goto out_err;
6506        }
6507
6508        pctx.spec->image_name = dup_token(&buf, NULL);
6509        if (!pctx.spec->image_name)
6510                goto out_mem;
6511        if (!*pctx.spec->image_name) {
6512                rbd_warn(NULL, "no image name provided");
6513                goto out_err;
6514        }
6515
6516        /*
6517         * Snapshot name is optional; default is to use "-"
6518         * (indicating the head/no snapshot).
6519         */
6520        len = next_token(&buf);
6521        if (!len) {
6522                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6523                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6524        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6525                ret = -ENAMETOOLONG;
6526                goto out_err;
6527        }
6528        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6529        if (!snap_name)
6530                goto out_mem;
6531        *(snap_name + len) = '\0';
6532        pctx.spec->snap_name = snap_name;
6533
6534        pctx.copts = ceph_alloc_options();
6535        if (!pctx.copts)
6536                goto out_mem;
6537
6538        /* Initialize all rbd options to the defaults */
6539
6540        pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6541        if (!pctx.opts)
6542                goto out_mem;
6543
6544        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6545        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6546        pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6547        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6548        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6549        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6550        pctx.opts->trim = RBD_TRIM_DEFAULT;
6551
6552        ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6553        if (ret)
6554                goto out_err;
6555
6556        ret = rbd_parse_options(options, &pctx);
6557        if (ret)
6558                goto out_err;
6559
6560        *ceph_opts = pctx.copts;
6561        *opts = pctx.opts;
6562        *rbd_spec = pctx.spec;
6563        kfree(options);
6564        return 0;
6565
6566out_mem:
6567        ret = -ENOMEM;
6568out_err:
6569        kfree(pctx.opts);
6570        ceph_destroy_options(pctx.copts);
6571        rbd_spec_put(pctx.spec);
6572        kfree(options);
6573        return ret;
6574}
6575
6576static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6577{
6578        down_write(&rbd_dev->lock_rwsem);
6579        if (__rbd_is_lock_owner(rbd_dev))
6580                __rbd_release_lock(rbd_dev);
6581        up_write(&rbd_dev->lock_rwsem);
6582}
6583
6584/*
6585 * If the wait is interrupted, an error is returned even if the lock
6586 * was successfully acquired.  rbd_dev_image_unlock() will release it
6587 * if needed.
6588 */
6589static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6590{
6591        long ret;
6592
6593        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6594                if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6595                        return 0;
6596
6597                rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6598                return -EINVAL;
6599        }
6600
6601        if (rbd_is_ro(rbd_dev))
6602                return 0;
6603
6604        rbd_assert(!rbd_is_lock_owner(rbd_dev));
6605        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6606        ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6607                            ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6608        if (ret > 0) {
6609                ret = rbd_dev->acquire_err;
6610        } else {
6611                cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6612                if (!ret)
6613                        ret = -ETIMEDOUT;
6614        }
6615
6616        if (ret) {
6617                rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6618                return ret;
6619        }
6620
6621        /*
6622         * The lock may have been released by now, unless automatic lock
6623         * transitions are disabled.
6624         */
6625        rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6626        return 0;
6627}
6628
6629/*
6630 * An rbd format 2 image has a unique identifier, distinct from the
6631 * name given to it by the user.  Internally, that identifier is
6632 * what's used to specify the names of objects related to the image.
6633 *
6634 * A special "rbd id" object is used to map an rbd image name to its
6635 * id.  If that object doesn't exist, then there is no v2 rbd image
6636 * with the supplied name.
6637 *
6638 * This function will record the given rbd_dev's image_id field if
6639 * it can be determined, and in that case will return 0.  If any
6640 * errors occur a negative errno will be returned and the rbd_dev's
6641 * image_id field will be unchanged (and should be NULL).
6642 */
6643static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6644{
6645        int ret;
6646        size_t size;
6647        CEPH_DEFINE_OID_ONSTACK(oid);
6648        void *response;
6649        char *image_id;
6650
6651        /*
6652         * When probing a parent image, the image id is already
6653         * known (and the image name likely is not).  There's no
6654         * need to fetch the image id again in this case.  We
6655         * do still need to set the image format though.
6656         */
6657        if (rbd_dev->spec->image_id) {
6658                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6659
6660                return 0;
6661        }
6662
6663        /*
6664         * First, see if the format 2 image id file exists, and if
6665         * so, get the image's persistent id from it.
6666         */
6667        ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6668                               rbd_dev->spec->image_name);
6669        if (ret)
6670                return ret;
6671
6672        dout("rbd id object name is %s\n", oid.name);
6673
6674        /* Response will be an encoded string, which includes a length */
6675        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6676        response = kzalloc(size, GFP_NOIO);
6677        if (!response) {
6678                ret = -ENOMEM;
6679                goto out;
6680        }
6681
6682        /* If it doesn't exist we'll assume it's a format 1 image */
6683
6684        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6685                                  "get_id", NULL, 0,
6686                                  response, size);
6687        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6688        if (ret == -ENOENT) {
6689                image_id = kstrdup("", GFP_KERNEL);
6690                ret = image_id ? 0 : -ENOMEM;
6691                if (!ret)
6692                        rbd_dev->image_format = 1;
6693        } else if (ret >= 0) {
6694                void *p = response;
6695
6696                image_id = ceph_extract_encoded_string(&p, p + ret,
6697                                                NULL, GFP_NOIO);
6698                ret = PTR_ERR_OR_ZERO(image_id);
6699                if (!ret)
6700                        rbd_dev->image_format = 2;
6701        }
6702
6703        if (!ret) {
6704                rbd_dev->spec->image_id = image_id;
6705                dout("image_id is %s\n", image_id);
6706        }
6707out:
6708        kfree(response);
6709        ceph_oid_destroy(&oid);
6710        return ret;
6711}
6712
6713/*
6714 * Undo whatever state changes are made by v1 or v2 header info
6715 * call.
6716 */
6717static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6718{
6719        struct rbd_image_header *header;
6720
6721        rbd_dev_parent_put(rbd_dev);
6722        rbd_object_map_free(rbd_dev);
6723        rbd_dev_mapping_clear(rbd_dev);
6724
6725        /* Free dynamic fields from the header, then zero it out */
6726
6727        header = &rbd_dev->header;
6728        ceph_put_snap_context(header->snapc);
6729        kfree(header->snap_sizes);
6730        kfree(header->snap_names);
6731        kfree(header->object_prefix);
6732        memset(header, 0, sizeof (*header));
6733}
6734
6735static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6736{
6737        int ret;
6738
6739        ret = rbd_dev_v2_object_prefix(rbd_dev);
6740        if (ret)
6741                goto out_err;
6742
6743        /*
6744         * Get the and check features for the image.  Currently the
6745         * features are assumed to never change.
6746         */
6747        ret = rbd_dev_v2_features(rbd_dev);
6748        if (ret)
6749                goto out_err;
6750
6751        /* If the image supports fancy striping, get its parameters */
6752
6753        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6754                ret = rbd_dev_v2_striping_info(rbd_dev);
6755                if (ret < 0)
6756                        goto out_err;
6757        }
6758
6759        if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6760                ret = rbd_dev_v2_data_pool(rbd_dev);
6761                if (ret)
6762                        goto out_err;
6763        }
6764
6765        rbd_init_layout(rbd_dev);
6766        return 0;
6767
6768out_err:
6769        rbd_dev->header.features = 0;
6770        kfree(rbd_dev->header.object_prefix);
6771        rbd_dev->header.object_prefix = NULL;
6772        return ret;
6773}
6774
6775/*
6776 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6777 * rbd_dev_image_probe() recursion depth, which means it's also the
6778 * length of the already discovered part of the parent chain.
6779 */
6780static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6781{
6782        struct rbd_device *parent = NULL;
6783        int ret;
6784
6785        if (!rbd_dev->parent_spec)
6786                return 0;
6787
6788        if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6789                pr_info("parent chain is too long (%d)\n", depth);
6790                ret = -EINVAL;
6791                goto out_err;
6792        }
6793
6794        parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6795        if (!parent) {
6796                ret = -ENOMEM;
6797                goto out_err;
6798        }
6799
6800        /*
6801         * Images related by parent/child relationships always share
6802         * rbd_client and spec/parent_spec, so bump their refcounts.
6803         */
6804        __rbd_get_client(rbd_dev->rbd_client);
6805        rbd_spec_get(rbd_dev->parent_spec);
6806
6807        __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6808
6809        ret = rbd_dev_image_probe(parent, depth);
6810        if (ret < 0)
6811                goto out_err;
6812
6813        rbd_dev->parent = parent;
6814        atomic_set(&rbd_dev->parent_ref, 1);
6815        return 0;
6816
6817out_err:
6818        rbd_dev_unparent(rbd_dev);
6819        rbd_dev_destroy(parent);
6820        return ret;
6821}
6822
6823static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6824{
6825        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6826        rbd_free_disk(rbd_dev);
6827        if (!single_major)
6828                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6829}
6830
6831/*
6832 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6833 * upon return.
6834 */
6835static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6836{
6837        int ret;
6838
6839        /* Record our major and minor device numbers. */
6840
6841        if (!single_major) {
6842                ret = register_blkdev(0, rbd_dev->name);
6843                if (ret < 0)
6844                        goto err_out_unlock;
6845
6846                rbd_dev->major = ret;
6847                rbd_dev->minor = 0;
6848        } else {
6849                rbd_dev->major = rbd_major;
6850                rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6851        }
6852
6853        /* Set up the blkdev mapping. */
6854
6855        ret = rbd_init_disk(rbd_dev);
6856        if (ret)
6857                goto err_out_blkdev;
6858
6859        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6860        set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6861
6862        ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6863        if (ret)
6864                goto err_out_disk;
6865
6866        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6867        up_write(&rbd_dev->header_rwsem);
6868        return 0;
6869
6870err_out_disk:
6871        rbd_free_disk(rbd_dev);
6872err_out_blkdev:
6873        if (!single_major)
6874                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6875err_out_unlock:
6876        up_write(&rbd_dev->header_rwsem);
6877        return ret;
6878}
6879
6880static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6881{
6882        struct rbd_spec *spec = rbd_dev->spec;
6883        int ret;
6884
6885        /* Record the header object name for this rbd image. */
6886
6887        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6888        if (rbd_dev->image_format == 1)
6889                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6890                                       spec->image_name, RBD_SUFFIX);
6891        else
6892                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6893                                       RBD_HEADER_PREFIX, spec->image_id);
6894
6895        return ret;
6896}
6897
6898static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6899{
6900        if (!is_snap) {
6901                pr_info("image %s/%s%s%s does not exist\n",
6902                        rbd_dev->spec->pool_name,
6903                        rbd_dev->spec->pool_ns ?: "",
6904                        rbd_dev->spec->pool_ns ? "/" : "",
6905                        rbd_dev->spec->image_name);
6906        } else {
6907                pr_info("snap %s/%s%s%s@%s does not exist\n",
6908                        rbd_dev->spec->pool_name,
6909                        rbd_dev->spec->pool_ns ?: "",
6910                        rbd_dev->spec->pool_ns ? "/" : "",
6911                        rbd_dev->spec->image_name,
6912                        rbd_dev->spec->snap_name);
6913        }
6914}
6915
6916static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6917{
6918        if (!rbd_is_ro(rbd_dev))
6919                rbd_unregister_watch(rbd_dev);
6920
6921        rbd_dev_unprobe(rbd_dev);
6922        rbd_dev->image_format = 0;
6923        kfree(rbd_dev->spec->image_id);
6924        rbd_dev->spec->image_id = NULL;
6925}
6926
6927/*
6928 * Probe for the existence of the header object for the given rbd
6929 * device.  If this image is the one being mapped (i.e., not a
6930 * parent), initiate a watch on its header object before using that
6931 * object to get detailed information about the rbd image.
6932 *
6933 * On success, returns with header_rwsem held for write if called
6934 * with @depth == 0.
6935 */
6936static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6937{
6938        bool need_watch = !rbd_is_ro(rbd_dev);
6939        int ret;
6940
6941        /*
6942         * Get the id from the image id object.  Unless there's an
6943         * error, rbd_dev->spec->image_id will be filled in with
6944         * a dynamically-allocated string, and rbd_dev->image_format
6945         * will be set to either 1 or 2.
6946         */
6947        ret = rbd_dev_image_id(rbd_dev);
6948        if (ret)
6949                return ret;
6950
6951        ret = rbd_dev_header_name(rbd_dev);
6952        if (ret)
6953                goto err_out_format;
6954
6955        if (need_watch) {
6956                ret = rbd_register_watch(rbd_dev);
6957                if (ret) {
6958                        if (ret == -ENOENT)
6959                                rbd_print_dne(rbd_dev, false);
6960                        goto err_out_format;
6961                }
6962        }
6963
6964        if (!depth)
6965                down_write(&rbd_dev->header_rwsem);
6966
6967        ret = rbd_dev_header_info(rbd_dev);
6968        if (ret) {
6969                if (ret == -ENOENT && !need_watch)
6970                        rbd_print_dne(rbd_dev, false);
6971                goto err_out_probe;
6972        }
6973
6974        /*
6975         * If this image is the one being mapped, we have pool name and
6976         * id, image name and id, and snap name - need to fill snap id.
6977         * Otherwise this is a parent image, identified by pool, image
6978         * and snap ids - need to fill in names for those ids.
6979         */
6980        if (!depth)
6981                ret = rbd_spec_fill_snap_id(rbd_dev);
6982        else
6983                ret = rbd_spec_fill_names(rbd_dev);
6984        if (ret) {
6985                if (ret == -ENOENT)
6986                        rbd_print_dne(rbd_dev, true);
6987                goto err_out_probe;
6988        }
6989
6990        ret = rbd_dev_mapping_set(rbd_dev);
6991        if (ret)
6992                goto err_out_probe;
6993
6994        if (rbd_is_snap(rbd_dev) &&
6995            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6996                ret = rbd_object_map_load(rbd_dev);
6997                if (ret)
6998                        goto err_out_probe;
6999        }
7000
7001        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7002                ret = rbd_dev_v2_parent_info(rbd_dev);
7003                if (ret)
7004                        goto err_out_probe;
7005        }
7006
7007        ret = rbd_dev_probe_parent(rbd_dev, depth);
7008        if (ret)
7009                goto err_out_probe;
7010
7011        dout("discovered format %u image, header name is %s\n",
7012                rbd_dev->image_format, rbd_dev->header_oid.name);
7013        return 0;
7014
7015err_out_probe:
7016        if (!depth)
7017                up_write(&rbd_dev->header_rwsem);
7018        if (need_watch)
7019                rbd_unregister_watch(rbd_dev);
7020        rbd_dev_unprobe(rbd_dev);
7021err_out_format:
7022        rbd_dev->image_format = 0;
7023        kfree(rbd_dev->spec->image_id);
7024        rbd_dev->spec->image_id = NULL;
7025        return ret;
7026}
7027
7028static ssize_t do_rbd_add(struct bus_type *bus,
7029                          const char *buf,
7030                          size_t count)
7031{
7032        struct rbd_device *rbd_dev = NULL;
7033        struct ceph_options *ceph_opts = NULL;
7034        struct rbd_options *rbd_opts = NULL;
7035        struct rbd_spec *spec = NULL;
7036        struct rbd_client *rbdc;
7037        int rc;
7038
7039        if (!capable(CAP_SYS_ADMIN))
7040                return -EPERM;
7041
7042        if (!try_module_get(THIS_MODULE))
7043                return -ENODEV;
7044
7045        /* parse add command */
7046        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7047        if (rc < 0)
7048                goto out;
7049
7050        rbdc = rbd_get_client(ceph_opts);
7051        if (IS_ERR(rbdc)) {
7052                rc = PTR_ERR(rbdc);
7053                goto err_out_args;
7054        }
7055
7056        /* pick the pool */
7057        rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7058        if (rc < 0) {
7059                if (rc == -ENOENT)
7060                        pr_info("pool %s does not exist\n", spec->pool_name);
7061                goto err_out_client;
7062        }
7063        spec->pool_id = (u64)rc;
7064
7065        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7066        if (!rbd_dev) {
7067                rc = -ENOMEM;
7068                goto err_out_client;
7069        }
7070        rbdc = NULL;            /* rbd_dev now owns this */
7071        spec = NULL;            /* rbd_dev now owns this */
7072        rbd_opts = NULL;        /* rbd_dev now owns this */
7073
7074        /* if we are mapping a snapshot it will be a read-only mapping */
7075        if (rbd_dev->opts->read_only ||
7076            strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7077                __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7078
7079        rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7080        if (!rbd_dev->config_info) {
7081                rc = -ENOMEM;
7082                goto err_out_rbd_dev;
7083        }
7084
7085        rc = rbd_dev_image_probe(rbd_dev, 0);
7086        if (rc < 0)
7087                goto err_out_rbd_dev;
7088
7089        if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7090                rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7091                         rbd_dev->layout.object_size);
7092                rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7093        }
7094
7095        rc = rbd_dev_device_setup(rbd_dev);
7096        if (rc)
7097                goto err_out_image_probe;
7098
7099        rc = rbd_add_acquire_lock(rbd_dev);
7100        if (rc)
7101                goto err_out_image_lock;
7102
7103        /* Everything's ready.  Announce the disk to the world. */
7104
7105        rc = device_add(&rbd_dev->dev);
7106        if (rc)
7107                goto err_out_image_lock;
7108
7109        device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7110        /* see rbd_init_disk() */
7111        blk_put_queue(rbd_dev->disk->queue);
7112
7113        spin_lock(&rbd_dev_list_lock);
7114        list_add_tail(&rbd_dev->node, &rbd_dev_list);
7115        spin_unlock(&rbd_dev_list_lock);
7116
7117        pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7118                (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7119                rbd_dev->header.features);
7120        rc = count;
7121out:
7122        module_put(THIS_MODULE);
7123        return rc;
7124
7125err_out_image_lock:
7126        rbd_dev_image_unlock(rbd_dev);
7127        rbd_dev_device_release(rbd_dev);
7128err_out_image_probe:
7129        rbd_dev_image_release(rbd_dev);
7130err_out_rbd_dev:
7131        rbd_dev_destroy(rbd_dev);
7132err_out_client:
7133        rbd_put_client(rbdc);
7134err_out_args:
7135        rbd_spec_put(spec);
7136        kfree(rbd_opts);
7137        goto out;
7138}
7139
7140static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7141{
7142        if (single_major)
7143                return -EINVAL;
7144
7145        return do_rbd_add(bus, buf, count);
7146}
7147
7148static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7149                                      size_t count)
7150{
7151        return do_rbd_add(bus, buf, count);
7152}
7153
7154static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7155{
7156        while (rbd_dev->parent) {
7157                struct rbd_device *first = rbd_dev;
7158                struct rbd_device *second = first->parent;
7159                struct rbd_device *third;
7160
7161                /*
7162                 * Follow to the parent with no grandparent and
7163                 * remove it.
7164                 */
7165                while (second && (third = second->parent)) {
7166                        first = second;
7167                        second = third;
7168                }
7169                rbd_assert(second);
7170                rbd_dev_image_release(second);
7171                rbd_dev_destroy(second);
7172                first->parent = NULL;
7173                first->parent_overlap = 0;
7174
7175                rbd_assert(first->parent_spec);
7176                rbd_spec_put(first->parent_spec);
7177                first->parent_spec = NULL;
7178        }
7179}
7180
7181static ssize_t do_rbd_remove(struct bus_type *bus,
7182                             const char *buf,
7183                             size_t count)
7184{
7185        struct rbd_device *rbd_dev = NULL;
7186        struct list_head *tmp;
7187        int dev_id;
7188        char opt_buf[6];
7189        bool force = false;
7190        int ret;
7191
7192        if (!capable(CAP_SYS_ADMIN))
7193                return -EPERM;
7194
7195        dev_id = -1;
7196        opt_buf[0] = '\0';
7197        sscanf(buf, "%d %5s", &dev_id, opt_buf);
7198        if (dev_id < 0) {
7199                pr_err("dev_id out of range\n");
7200                return -EINVAL;
7201        }
7202        if (opt_buf[0] != '\0') {
7203                if (!strcmp(opt_buf, "force")) {
7204                        force = true;
7205                } else {
7206                        pr_err("bad remove option at '%s'\n", opt_buf);
7207                        return -EINVAL;
7208                }
7209        }
7210
7211        ret = -ENOENT;
7212        spin_lock(&rbd_dev_list_lock);
7213        list_for_each(tmp, &rbd_dev_list) {
7214                rbd_dev = list_entry(tmp, struct rbd_device, node);
7215                if (rbd_dev->dev_id == dev_id) {
7216                        ret = 0;
7217                        break;
7218                }
7219        }
7220        if (!ret) {
7221                spin_lock_irq(&rbd_dev->lock);
7222                if (rbd_dev->open_count && !force)
7223                        ret = -EBUSY;
7224                else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7225                                          &rbd_dev->flags))
7226                        ret = -EINPROGRESS;
7227                spin_unlock_irq(&rbd_dev->lock);
7228        }
7229        spin_unlock(&rbd_dev_list_lock);
7230        if (ret)
7231                return ret;
7232
7233        if (force) {
7234                /*
7235                 * Prevent new IO from being queued and wait for existing
7236                 * IO to complete/fail.
7237                 */
7238                blk_mq_freeze_queue(rbd_dev->disk->queue);
7239                blk_set_queue_dying(rbd_dev->disk->queue);
7240        }
7241
7242        del_gendisk(rbd_dev->disk);
7243        spin_lock(&rbd_dev_list_lock);
7244        list_del_init(&rbd_dev->node);
7245        spin_unlock(&rbd_dev_list_lock);
7246        device_del(&rbd_dev->dev);
7247
7248        rbd_dev_image_unlock(rbd_dev);
7249        rbd_dev_device_release(rbd_dev);
7250        rbd_dev_image_release(rbd_dev);
7251        rbd_dev_destroy(rbd_dev);
7252        return count;
7253}
7254
7255static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7256{
7257        if (single_major)
7258                return -EINVAL;
7259
7260        return do_rbd_remove(bus, buf, count);
7261}
7262
7263static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7264                                         size_t count)
7265{
7266        return do_rbd_remove(bus, buf, count);
7267}
7268
7269/*
7270 * create control files in sysfs
7271 * /sys/bus/rbd/...
7272 */
7273static int __init rbd_sysfs_init(void)
7274{
7275        int ret;
7276
7277        ret = device_register(&rbd_root_dev);
7278        if (ret < 0)
7279                return ret;
7280
7281        ret = bus_register(&rbd_bus_type);
7282        if (ret < 0)
7283                device_unregister(&rbd_root_dev);
7284
7285        return ret;
7286}
7287
7288static void __exit rbd_sysfs_cleanup(void)
7289{
7290        bus_unregister(&rbd_bus_type);
7291        device_unregister(&rbd_root_dev);
7292}
7293
7294static int __init rbd_slab_init(void)
7295{
7296        rbd_assert(!rbd_img_request_cache);
7297        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7298        if (!rbd_img_request_cache)
7299                return -ENOMEM;
7300
7301        rbd_assert(!rbd_obj_request_cache);
7302        rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7303        if (!rbd_obj_request_cache)
7304                goto out_err;
7305
7306        return 0;
7307
7308out_err:
7309        kmem_cache_destroy(rbd_img_request_cache);
7310        rbd_img_request_cache = NULL;
7311        return -ENOMEM;
7312}
7313
7314static void rbd_slab_exit(void)
7315{
7316        rbd_assert(rbd_obj_request_cache);
7317        kmem_cache_destroy(rbd_obj_request_cache);
7318        rbd_obj_request_cache = NULL;
7319
7320        rbd_assert(rbd_img_request_cache);
7321        kmem_cache_destroy(rbd_img_request_cache);
7322        rbd_img_request_cache = NULL;
7323}
7324
7325static int __init rbd_init(void)
7326{
7327        int rc;
7328
7329        if (!libceph_compatible(NULL)) {
7330                rbd_warn(NULL, "libceph incompatibility (quitting)");
7331                return -EINVAL;
7332        }
7333
7334        rc = rbd_slab_init();
7335        if (rc)
7336                return rc;
7337
7338        /*
7339         * The number of active work items is limited by the number of
7340         * rbd devices * queue depth, so leave @max_active at default.
7341         */
7342        rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7343        if (!rbd_wq) {
7344                rc = -ENOMEM;
7345                goto err_out_slab;
7346        }
7347
7348        if (single_major) {
7349                rbd_major = register_blkdev(0, RBD_DRV_NAME);
7350                if (rbd_major < 0) {
7351                        rc = rbd_major;
7352                        goto err_out_wq;
7353                }
7354        }
7355
7356        rc = rbd_sysfs_init();
7357        if (rc)
7358                goto err_out_blkdev;
7359
7360        if (single_major)
7361                pr_info("loaded (major %d)\n", rbd_major);
7362        else
7363                pr_info("loaded\n");
7364
7365        return 0;
7366
7367err_out_blkdev:
7368        if (single_major)
7369                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7370err_out_wq:
7371        destroy_workqueue(rbd_wq);
7372err_out_slab:
7373        rbd_slab_exit();
7374        return rc;
7375}
7376
7377static void __exit rbd_exit(void)
7378{
7379        ida_destroy(&rbd_dev_id_ida);
7380        rbd_sysfs_cleanup();
7381        if (single_major)
7382                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7383        destroy_workqueue(rbd_wq);
7384        rbd_slab_exit();
7385}
7386
7387module_init(rbd_init);
7388module_exit(rbd_exit);
7389
7390MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7391MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7392MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7393/* following authorship retained from original osdblk.c */
7394MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7395
7396MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7397MODULE_LICENSE("GPL");
7398