linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/striper.h>
  36#include <linux/ceph/decode.h>
  37#include <linux/parser.h>
  38#include <linux/bsearch.h>
  39
  40#include <linux/kernel.h>
  41#include <linux/device.h>
  42#include <linux/module.h>
  43#include <linux/blk-mq.h>
  44#include <linux/fs.h>
  45#include <linux/blkdev.h>
  46#include <linux/slab.h>
  47#include <linux/idr.h>
  48#include <linux/workqueue.h>
  49
  50#include "rbd_types.h"
  51
  52#define RBD_DEBUG       /* Activate rbd_assert() calls */
  53
  54/*
  55 * Increment the given counter and return its updated value.
  56 * If the counter is already 0 it will not be incremented.
  57 * If the counter is already at its maximum value returns
  58 * -EINVAL without updating it.
  59 */
  60static int atomic_inc_return_safe(atomic_t *v)
  61{
  62        unsigned int counter;
  63
  64        counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
  65        if (counter <= (unsigned int)INT_MAX)
  66                return (int)counter;
  67
  68        atomic_dec(v);
  69
  70        return -EINVAL;
  71}
  72
  73/* Decrement the counter.  Return the resulting value, or -EINVAL */
  74static int atomic_dec_return_safe(atomic_t *v)
  75{
  76        int counter;
  77
  78        counter = atomic_dec_return(v);
  79        if (counter >= 0)
  80                return counter;
  81
  82        atomic_inc(v);
  83
  84        return -EINVAL;
  85}
  86
  87#define RBD_DRV_NAME "rbd"
  88
  89#define RBD_MINORS_PER_MAJOR            256
  90#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  91
  92#define RBD_MAX_PARENT_CHAIN_LEN        16
  93
  94#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  95#define RBD_MAX_SNAP_NAME_LEN   \
  96                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  97
  98#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
  99
 100#define RBD_SNAP_HEAD_NAME      "-"
 101
 102#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 103
 104/* This allows a single page to hold an image name sent by OSD */
 105#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 106#define RBD_IMAGE_ID_LEN_MAX    64
 107
 108#define RBD_OBJ_PREFIX_LEN_MAX  64
 109
 110#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 111#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 112
 113/* Feature bits */
 114
 115#define RBD_FEATURE_LAYERING            (1ULL<<0)
 116#define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
 117#define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
 118#define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
 119#define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
 120#define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
 121#define RBD_FEATURE_DATA_POOL           (1ULL<<7)
 122#define RBD_FEATURE_OPERATIONS          (1ULL<<8)
 123
 124#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 125                                 RBD_FEATURE_STRIPINGV2 |       \
 126                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
 127                                 RBD_FEATURE_OBJECT_MAP |       \
 128                                 RBD_FEATURE_FAST_DIFF |        \
 129                                 RBD_FEATURE_DEEP_FLATTEN |     \
 130                                 RBD_FEATURE_DATA_POOL |        \
 131                                 RBD_FEATURE_OPERATIONS)
 132
 133/* Features supported by this (client software) implementation. */
 134
 135#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 136
 137/*
 138 * An RBD device name will be "rbd#", where the "rbd" comes from
 139 * RBD_DRV_NAME above, and # is a unique integer identifier.
 140 */
 141#define DEV_NAME_LEN            32
 142
 143/*
 144 * block device image metadata (in-memory version)
 145 */
 146struct rbd_image_header {
 147        /* These six fields never change for a given rbd image */
 148        char *object_prefix;
 149        __u8 obj_order;
 150        u64 stripe_unit;
 151        u64 stripe_count;
 152        s64 data_pool_id;
 153        u64 features;           /* Might be changeable someday? */
 154
 155        /* The remaining fields need to be updated occasionally */
 156        u64 image_size;
 157        struct ceph_snap_context *snapc;
 158        char *snap_names;       /* format 1 only */
 159        u64 *snap_sizes;        /* format 1 only */
 160};
 161
 162/*
 163 * An rbd image specification.
 164 *
 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 166 * identify an image.  Each rbd_dev structure includes a pointer to
 167 * an rbd_spec structure that encapsulates this identity.
 168 *
 169 * Each of the id's in an rbd_spec has an associated name.  For a
 170 * user-mapped image, the names are supplied and the id's associated
 171 * with them are looked up.  For a layered image, a parent image is
 172 * defined by the tuple, and the names are looked up.
 173 *
 174 * An rbd_dev structure contains a parent_spec pointer which is
 175 * non-null if the image it represents is a child in a layered
 176 * image.  This pointer will refer to the rbd_spec structure used
 177 * by the parent rbd_dev for its own identity (i.e., the structure
 178 * is shared between the parent and child).
 179 *
 180 * Since these structures are populated once, during the discovery
 181 * phase of image construction, they are effectively immutable so
 182 * we make no effort to synchronize access to them.
 183 *
 184 * Note that code herein does not assume the image name is known (it
 185 * could be a null pointer).
 186 */
 187struct rbd_spec {
 188        u64             pool_id;
 189        const char      *pool_name;
 190        const char      *pool_ns;       /* NULL if default, never "" */
 191
 192        const char      *image_id;
 193        const char      *image_name;
 194
 195        u64             snap_id;
 196        const char      *snap_name;
 197
 198        struct kref     kref;
 199};
 200
 201/*
 202 * an instance of the client.  multiple devices may share an rbd client.
 203 */
 204struct rbd_client {
 205        struct ceph_client      *client;
 206        struct kref             kref;
 207        struct list_head        node;
 208};
 209
 210struct pending_result {
 211        int                     result;         /* first nonzero result */
 212        int                     num_pending;
 213};
 214
 215struct rbd_img_request;
 216
 217enum obj_request_type {
 218        OBJ_REQUEST_NODATA = 1,
 219        OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
 220        OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
 221        OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 222};
 223
 224enum obj_operation_type {
 225        OBJ_OP_READ = 1,
 226        OBJ_OP_WRITE,
 227        OBJ_OP_DISCARD,
 228        OBJ_OP_ZEROOUT,
 229};
 230
 231#define RBD_OBJ_FLAG_DELETION                   (1U << 0)
 232#define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
 233#define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
 234#define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
 235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
 236
 237enum rbd_obj_read_state {
 238        RBD_OBJ_READ_START = 1,
 239        RBD_OBJ_READ_OBJECT,
 240        RBD_OBJ_READ_PARENT,
 241};
 242
 243/*
 244 * Writes go through the following state machine to deal with
 245 * layering:
 246 *
 247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
 248 *            .                 |                                    .
 249 *            .                 v                                    .
 250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
 251 *            .                 |                    .               .
 252 *            .                 v                    v (deep-copyup  .
 253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
 254 * flattened) v                 |                    .               .
 255 *            .                 v                    .               .
 256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
 257 *                              |                        not needed) v
 258 *                              v                                    .
 259 *                            done . . . . . . . . . . . . . . . . . .
 260 *                              ^
 261 *                              |
 262 *                     RBD_OBJ_WRITE_FLAT
 263 *
 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 265 * assert_exists guard is needed or not (in some cases it's not needed
 266 * even if there is a parent).
 267 */
 268enum rbd_obj_write_state {
 269        RBD_OBJ_WRITE_START = 1,
 270        RBD_OBJ_WRITE_PRE_OBJECT_MAP,
 271        RBD_OBJ_WRITE_OBJECT,
 272        __RBD_OBJ_WRITE_COPYUP,
 273        RBD_OBJ_WRITE_COPYUP,
 274        RBD_OBJ_WRITE_POST_OBJECT_MAP,
 275};
 276
 277enum rbd_obj_copyup_state {
 278        RBD_OBJ_COPYUP_START = 1,
 279        RBD_OBJ_COPYUP_READ_PARENT,
 280        __RBD_OBJ_COPYUP_OBJECT_MAPS,
 281        RBD_OBJ_COPYUP_OBJECT_MAPS,
 282        __RBD_OBJ_COPYUP_WRITE_OBJECT,
 283        RBD_OBJ_COPYUP_WRITE_OBJECT,
 284};
 285
 286struct rbd_obj_request {
 287        struct ceph_object_extent ex;
 288        unsigned int            flags;  /* RBD_OBJ_FLAG_* */
 289        union {
 290                enum rbd_obj_read_state  read_state;    /* for reads */
 291                enum rbd_obj_write_state write_state;   /* for writes */
 292        };
 293
 294        struct rbd_img_request  *img_request;
 295        struct ceph_file_extent *img_extents;
 296        u32                     num_img_extents;
 297
 298        union {
 299                struct ceph_bio_iter    bio_pos;
 300                struct {
 301                        struct ceph_bvec_iter   bvec_pos;
 302                        u32                     bvec_count;
 303                        u32                     bvec_idx;
 304                };
 305        };
 306
 307        enum rbd_obj_copyup_state copyup_state;
 308        struct bio_vec          *copyup_bvecs;
 309        u32                     copyup_bvec_count;
 310
 311        struct list_head        osd_reqs;       /* w/ r_private_item */
 312
 313        struct mutex            state_mutex;
 314        struct pending_result   pending;
 315        struct kref             kref;
 316};
 317
 318enum img_req_flags {
 319        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 320        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 321};
 322
 323enum rbd_img_state {
 324        RBD_IMG_START = 1,
 325        RBD_IMG_EXCLUSIVE_LOCK,
 326        __RBD_IMG_OBJECT_REQUESTS,
 327        RBD_IMG_OBJECT_REQUESTS,
 328};
 329
 330struct rbd_img_request {
 331        struct rbd_device       *rbd_dev;
 332        enum obj_operation_type op_type;
 333        enum obj_request_type   data_type;
 334        unsigned long           flags;
 335        enum rbd_img_state      state;
 336        union {
 337                u64                     snap_id;        /* for reads */
 338                struct ceph_snap_context *snapc;        /* for writes */
 339        };
 340        struct rbd_obj_request  *obj_request;   /* obj req initiator */
 341
 342        struct list_head        lock_item;
 343        struct list_head        object_extents; /* obj_req.ex structs */
 344
 345        struct mutex            state_mutex;
 346        struct pending_result   pending;
 347        struct work_struct      work;
 348        int                     work_result;
 349};
 350
 351#define for_each_obj_request(ireq, oreq) \
 352        list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 353#define for_each_obj_request_safe(ireq, oreq, n) \
 354        list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 355
 356enum rbd_watch_state {
 357        RBD_WATCH_STATE_UNREGISTERED,
 358        RBD_WATCH_STATE_REGISTERED,
 359        RBD_WATCH_STATE_ERROR,
 360};
 361
 362enum rbd_lock_state {
 363        RBD_LOCK_STATE_UNLOCKED,
 364        RBD_LOCK_STATE_LOCKED,
 365        RBD_LOCK_STATE_RELEASING,
 366};
 367
 368/* WatchNotify::ClientId */
 369struct rbd_client_id {
 370        u64 gid;
 371        u64 handle;
 372};
 373
 374struct rbd_mapping {
 375        u64                     size;
 376};
 377
 378/*
 379 * a single device
 380 */
 381struct rbd_device {
 382        int                     dev_id;         /* blkdev unique id */
 383
 384        int                     major;          /* blkdev assigned major */
 385        int                     minor;
 386        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 387
 388        u32                     image_format;   /* Either 1 or 2 */
 389        struct rbd_client       *rbd_client;
 390
 391        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 392
 393        spinlock_t              lock;           /* queue, flags, open_count */
 394
 395        struct rbd_image_header header;
 396        unsigned long           flags;          /* possibly lock protected */
 397        struct rbd_spec         *spec;
 398        struct rbd_options      *opts;
 399        char                    *config_info;   /* add{,_single_major} string */
 400
 401        struct ceph_object_id   header_oid;
 402        struct ceph_object_locator header_oloc;
 403
 404        struct ceph_file_layout layout;         /* used for all rbd requests */
 405
 406        struct mutex            watch_mutex;
 407        enum rbd_watch_state    watch_state;
 408        struct ceph_osd_linger_request *watch_handle;
 409        u64                     watch_cookie;
 410        struct delayed_work     watch_dwork;
 411
 412        struct rw_semaphore     lock_rwsem;
 413        enum rbd_lock_state     lock_state;
 414        char                    lock_cookie[32];
 415        struct rbd_client_id    owner_cid;
 416        struct work_struct      acquired_lock_work;
 417        struct work_struct      released_lock_work;
 418        struct delayed_work     lock_dwork;
 419        struct work_struct      unlock_work;
 420        spinlock_t              lock_lists_lock;
 421        struct list_head        acquiring_list;
 422        struct list_head        running_list;
 423        struct completion       acquire_wait;
 424        int                     acquire_err;
 425        struct completion       releasing_wait;
 426
 427        spinlock_t              object_map_lock;
 428        u8                      *object_map;
 429        u64                     object_map_size;        /* in objects */
 430        u64                     object_map_flags;
 431
 432        struct workqueue_struct *task_wq;
 433
 434        struct rbd_spec         *parent_spec;
 435        u64                     parent_overlap;
 436        atomic_t                parent_ref;
 437        struct rbd_device       *parent;
 438
 439        /* Block layer tags. */
 440        struct blk_mq_tag_set   tag_set;
 441
 442        /* protects updating the header */
 443        struct rw_semaphore     header_rwsem;
 444
 445        struct rbd_mapping      mapping;
 446
 447        struct list_head        node;
 448
 449        /* sysfs related */
 450        struct device           dev;
 451        unsigned long           open_count;     /* protected by lock */
 452};
 453
 454/*
 455 * Flag bits for rbd_dev->flags:
 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 457 *   by rbd_dev->lock
 458 */
 459enum rbd_dev_flags {
 460        RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
 461        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 462        RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 463};
 464
 465static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 466
 467static LIST_HEAD(rbd_dev_list);    /* devices */
 468static DEFINE_SPINLOCK(rbd_dev_list_lock);
 469
 470static LIST_HEAD(rbd_client_list);              /* clients */
 471static DEFINE_SPINLOCK(rbd_client_list_lock);
 472
 473/* Slab caches for frequently-allocated structures */
 474
 475static struct kmem_cache        *rbd_img_request_cache;
 476static struct kmem_cache        *rbd_obj_request_cache;
 477
 478static int rbd_major;
 479static DEFINE_IDA(rbd_dev_id_ida);
 480
 481static struct workqueue_struct *rbd_wq;
 482
 483static struct ceph_snap_context rbd_empty_snapc = {
 484        .nref = REFCOUNT_INIT(1),
 485};
 486
 487/*
 488 * single-major requires >= 0.75 version of userspace rbd utility.
 489 */
 490static bool single_major = true;
 491module_param(single_major, bool, 0444);
 492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 493
 494static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 495                       size_t count);
 496static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 497                          size_t count);
 498static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
 499                                    size_t count);
 500static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 501                                       size_t count);
 502static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 503
 504static int rbd_dev_id_to_minor(int dev_id)
 505{
 506        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 507}
 508
 509static int minor_to_rbd_dev_id(int minor)
 510{
 511        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 512}
 513
 514static bool rbd_is_ro(struct rbd_device *rbd_dev)
 515{
 516        return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
 517}
 518
 519static bool rbd_is_snap(struct rbd_device *rbd_dev)
 520{
 521        return rbd_dev->spec->snap_id != CEPH_NOSNAP;
 522}
 523
 524static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 525{
 526        lockdep_assert_held(&rbd_dev->lock_rwsem);
 527
 528        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 529               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 530}
 531
 532static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 533{
 534        bool is_lock_owner;
 535
 536        down_read(&rbd_dev->lock_rwsem);
 537        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 538        up_read(&rbd_dev->lock_rwsem);
 539        return is_lock_owner;
 540}
 541
 542static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
 543{
 544        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 545}
 546
 547static BUS_ATTR(add, 0200, NULL, rbd_add);
 548static BUS_ATTR(remove, 0200, NULL, rbd_remove);
 549static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
 550static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
 551static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
 552
 553static struct attribute *rbd_bus_attrs[] = {
 554        &bus_attr_add.attr,
 555        &bus_attr_remove.attr,
 556        &bus_attr_add_single_major.attr,
 557        &bus_attr_remove_single_major.attr,
 558        &bus_attr_supported_features.attr,
 559        NULL,
 560};
 561
 562static umode_t rbd_bus_is_visible(struct kobject *kobj,
 563                                  struct attribute *attr, int index)
 564{
 565        if (!single_major &&
 566            (attr == &bus_attr_add_single_major.attr ||
 567             attr == &bus_attr_remove_single_major.attr))
 568                return 0;
 569
 570        return attr->mode;
 571}
 572
 573static const struct attribute_group rbd_bus_group = {
 574        .attrs = rbd_bus_attrs,
 575        .is_visible = rbd_bus_is_visible,
 576};
 577__ATTRIBUTE_GROUPS(rbd_bus);
 578
 579static struct bus_type rbd_bus_type = {
 580        .name           = "rbd",
 581        .bus_groups     = rbd_bus_groups,
 582};
 583
 584static void rbd_root_dev_release(struct device *dev)
 585{
 586}
 587
 588static struct device rbd_root_dev = {
 589        .init_name =    "rbd",
 590        .release =      rbd_root_dev_release,
 591};
 592
 593static __printf(2, 3)
 594void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 595{
 596        struct va_format vaf;
 597        va_list args;
 598
 599        va_start(args, fmt);
 600        vaf.fmt = fmt;
 601        vaf.va = &args;
 602
 603        if (!rbd_dev)
 604                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 605        else if (rbd_dev->disk)
 606                printk(KERN_WARNING "%s: %s: %pV\n",
 607                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 608        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 609                printk(KERN_WARNING "%s: image %s: %pV\n",
 610                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 611        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 612                printk(KERN_WARNING "%s: id %s: %pV\n",
 613                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 614        else    /* punt */
 615                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 616                        RBD_DRV_NAME, rbd_dev, &vaf);
 617        va_end(args);
 618}
 619
 620#ifdef RBD_DEBUG
 621#define rbd_assert(expr)                                                \
 622                if (unlikely(!(expr))) {                                \
 623                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 624                                                "at line %d:\n\n"       \
 625                                        "\trbd_assert(%s);\n\n",        \
 626                                        __func__, __LINE__, #expr);     \
 627                        BUG();                                          \
 628                }
 629#else /* !RBD_DEBUG */
 630#  define rbd_assert(expr)      ((void) 0)
 631#endif /* !RBD_DEBUG */
 632
 633static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 634
 635static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 636static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 637static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 638static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 639static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 640                                        u64 snap_id);
 641static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 642                                u8 *order, u64 *snap_size);
 643static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
 644
 645static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
 646static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
 647
 648/*
 649 * Return true if nothing else is pending.
 650 */
 651static bool pending_result_dec(struct pending_result *pending, int *result)
 652{
 653        rbd_assert(pending->num_pending > 0);
 654
 655        if (*result && !pending->result)
 656                pending->result = *result;
 657        if (--pending->num_pending)
 658                return false;
 659
 660        *result = pending->result;
 661        return true;
 662}
 663
 664static int rbd_open(struct block_device *bdev, fmode_t mode)
 665{
 666        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 667        bool removing = false;
 668
 669        spin_lock_irq(&rbd_dev->lock);
 670        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 671                removing = true;
 672        else
 673                rbd_dev->open_count++;
 674        spin_unlock_irq(&rbd_dev->lock);
 675        if (removing)
 676                return -ENOENT;
 677
 678        (void) get_device(&rbd_dev->dev);
 679
 680        return 0;
 681}
 682
 683static void rbd_release(struct gendisk *disk, fmode_t mode)
 684{
 685        struct rbd_device *rbd_dev = disk->private_data;
 686        unsigned long open_count_before;
 687
 688        spin_lock_irq(&rbd_dev->lock);
 689        open_count_before = rbd_dev->open_count--;
 690        spin_unlock_irq(&rbd_dev->lock);
 691        rbd_assert(open_count_before > 0);
 692
 693        put_device(&rbd_dev->dev);
 694}
 695
 696static int rbd_set_read_only(struct block_device *bdev, bool ro)
 697{
 698        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 699
 700        /*
 701         * Both images mapped read-only and snapshots can't be marked
 702         * read-write.
 703         */
 704        if (!ro) {
 705                if (rbd_is_ro(rbd_dev))
 706                        return -EROFS;
 707
 708                rbd_assert(!rbd_is_snap(rbd_dev));
 709        }
 710
 711        return 0;
 712}
 713
 714static const struct block_device_operations rbd_bd_ops = {
 715        .owner                  = THIS_MODULE,
 716        .open                   = rbd_open,
 717        .release                = rbd_release,
 718        .set_read_only          = rbd_set_read_only,
 719};
 720
 721/*
 722 * Initialize an rbd client instance.  Success or not, this function
 723 * consumes ceph_opts.  Caller holds client_mutex.
 724 */
 725static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 726{
 727        struct rbd_client *rbdc;
 728        int ret = -ENOMEM;
 729
 730        dout("%s:\n", __func__);
 731        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 732        if (!rbdc)
 733                goto out_opt;
 734
 735        kref_init(&rbdc->kref);
 736        INIT_LIST_HEAD(&rbdc->node);
 737
 738        rbdc->client = ceph_create_client(ceph_opts, rbdc);
 739        if (IS_ERR(rbdc->client))
 740                goto out_rbdc;
 741        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 742
 743        ret = ceph_open_session(rbdc->client);
 744        if (ret < 0)
 745                goto out_client;
 746
 747        spin_lock(&rbd_client_list_lock);
 748        list_add_tail(&rbdc->node, &rbd_client_list);
 749        spin_unlock(&rbd_client_list_lock);
 750
 751        dout("%s: rbdc %p\n", __func__, rbdc);
 752
 753        return rbdc;
 754out_client:
 755        ceph_destroy_client(rbdc->client);
 756out_rbdc:
 757        kfree(rbdc);
 758out_opt:
 759        if (ceph_opts)
 760                ceph_destroy_options(ceph_opts);
 761        dout("%s: error %d\n", __func__, ret);
 762
 763        return ERR_PTR(ret);
 764}
 765
 766static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 767{
 768        kref_get(&rbdc->kref);
 769
 770        return rbdc;
 771}
 772
 773/*
 774 * Find a ceph client with specific addr and configuration.  If
 775 * found, bump its reference count.
 776 */
 777static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 778{
 779        struct rbd_client *client_node;
 780        bool found = false;
 781
 782        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 783                return NULL;
 784
 785        spin_lock(&rbd_client_list_lock);
 786        list_for_each_entry(client_node, &rbd_client_list, node) {
 787                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 788                        __rbd_get_client(client_node);
 789
 790                        found = true;
 791                        break;
 792                }
 793        }
 794        spin_unlock(&rbd_client_list_lock);
 795
 796        return found ? client_node : NULL;
 797}
 798
 799/*
 800 * (Per device) rbd map options
 801 */
 802enum {
 803        Opt_queue_depth,
 804        Opt_alloc_size,
 805        Opt_lock_timeout,
 806        Opt_last_int,
 807        /* int args above */
 808        Opt_pool_ns,
 809        Opt_compression_hint,
 810        Opt_last_string,
 811        /* string args above */
 812        Opt_read_only,
 813        Opt_read_write,
 814        Opt_lock_on_read,
 815        Opt_exclusive,
 816        Opt_notrim,
 817        Opt_err
 818};
 819
 820static match_table_t rbd_opts_tokens = {
 821        {Opt_queue_depth, "queue_depth=%d"},
 822        {Opt_alloc_size, "alloc_size=%d"},
 823        {Opt_lock_timeout, "lock_timeout=%d"},
 824        /* int args above */
 825        {Opt_pool_ns, "_pool_ns=%s"},
 826        {Opt_compression_hint, "compression_hint=%s"},
 827        /* string args above */
 828        {Opt_read_only, "read_only"},
 829        {Opt_read_only, "ro"},          /* Alternate spelling */
 830        {Opt_read_write, "read_write"},
 831        {Opt_read_write, "rw"},         /* Alternate spelling */
 832        {Opt_lock_on_read, "lock_on_read"},
 833        {Opt_exclusive, "exclusive"},
 834        {Opt_notrim, "notrim"},
 835        {Opt_err, NULL}
 836};
 837
 838struct rbd_options {
 839        int     queue_depth;
 840        int     alloc_size;
 841        unsigned long   lock_timeout;
 842        bool    read_only;
 843        bool    lock_on_read;
 844        bool    exclusive;
 845        bool    trim;
 846
 847        u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 848};
 849
 850#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 851#define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
 852#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 853#define RBD_READ_ONLY_DEFAULT   false
 854#define RBD_LOCK_ON_READ_DEFAULT false
 855#define RBD_EXCLUSIVE_DEFAULT   false
 856#define RBD_TRIM_DEFAULT        true
 857
 858struct parse_rbd_opts_ctx {
 859        struct rbd_spec         *spec;
 860        struct rbd_options      *opts;
 861};
 862
 863static int parse_rbd_opts_token(char *c, void *private)
 864{
 865        struct parse_rbd_opts_ctx *pctx = private;
 866        substring_t argstr[MAX_OPT_ARGS];
 867        int token, intval, ret;
 868
 869        token = match_token(c, rbd_opts_tokens, argstr);
 870        if (token < Opt_last_int) {
 871                ret = match_int(&argstr[0], &intval);
 872                if (ret < 0) {
 873                        pr_err("bad option arg (not int) at '%s'\n", c);
 874                        return ret;
 875                }
 876                dout("got int token %d val %d\n", token, intval);
 877        } else if (token > Opt_last_int && token < Opt_last_string) {
 878                dout("got string token %d val %s\n", token, argstr[0].from);
 879        } else {
 880                dout("got token %d\n", token);
 881        }
 882
 883        switch (token) {
 884        case Opt_queue_depth:
 885                if (intval < 1) {
 886                        pr_err("queue_depth out of range\n");
 887                        return -EINVAL;
 888                }
 889                pctx->opts->queue_depth = intval;
 890                break;
 891        case Opt_alloc_size:
 892                if (intval < SECTOR_SIZE) {
 893                        pr_err("alloc_size out of range\n");
 894                        return -EINVAL;
 895                }
 896                if (!is_power_of_2(intval)) {
 897                        pr_err("alloc_size must be a power of 2\n");
 898                        return -EINVAL;
 899                }
 900                pctx->opts->alloc_size = intval;
 901                break;
 902        case Opt_lock_timeout:
 903                /* 0 is "wait forever" (i.e. infinite timeout) */
 904                if (intval < 0 || intval > INT_MAX / 1000) {
 905                        pr_err("lock_timeout out of range\n");
 906                        return -EINVAL;
 907                }
 908                pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
 909                break;
 910        case Opt_pool_ns:
 911                kfree(pctx->spec->pool_ns);
 912                pctx->spec->pool_ns = match_strdup(argstr);
 913                if (!pctx->spec->pool_ns)
 914                        return -ENOMEM;
 915                break;
 916        case Opt_compression_hint:
 917                if (!strcmp(argstr[0].from, "none")) {
 918                        pctx->opts->alloc_hint_flags &=
 919                            ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
 920                              CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
 921                } else if (!strcmp(argstr[0].from, "compressible")) {
 922                        pctx->opts->alloc_hint_flags |=
 923                            CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
 924                        pctx->opts->alloc_hint_flags &=
 925                            ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
 926                } else if (!strcmp(argstr[0].from, "incompressible")) {
 927                        pctx->opts->alloc_hint_flags |=
 928                            CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
 929                        pctx->opts->alloc_hint_flags &=
 930                            ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
 931                } else {
 932                        return -EINVAL;
 933                }
 934                break;
 935        case Opt_read_only:
 936                pctx->opts->read_only = true;
 937                break;
 938        case Opt_read_write:
 939                pctx->opts->read_only = false;
 940                break;
 941        case Opt_lock_on_read:
 942                pctx->opts->lock_on_read = true;
 943                break;
 944        case Opt_exclusive:
 945                pctx->opts->exclusive = true;
 946                break;
 947        case Opt_notrim:
 948                pctx->opts->trim = false;
 949                break;
 950        default:
 951                /* libceph prints "bad option" msg */
 952                return -EINVAL;
 953        }
 954
 955        return 0;
 956}
 957
 958static char* obj_op_name(enum obj_operation_type op_type)
 959{
 960        switch (op_type) {
 961        case OBJ_OP_READ:
 962                return "read";
 963        case OBJ_OP_WRITE:
 964                return "write";
 965        case OBJ_OP_DISCARD:
 966                return "discard";
 967        case OBJ_OP_ZEROOUT:
 968                return "zeroout";
 969        default:
 970                return "???";
 971        }
 972}
 973
 974/*
 975 * Destroy ceph client
 976 *
 977 * Caller must hold rbd_client_list_lock.
 978 */
 979static void rbd_client_release(struct kref *kref)
 980{
 981        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 982
 983        dout("%s: rbdc %p\n", __func__, rbdc);
 984        spin_lock(&rbd_client_list_lock);
 985        list_del(&rbdc->node);
 986        spin_unlock(&rbd_client_list_lock);
 987
 988        ceph_destroy_client(rbdc->client);
 989        kfree(rbdc);
 990}
 991
 992/*
 993 * Drop reference to ceph client node. If it's not referenced anymore, release
 994 * it.
 995 */
 996static void rbd_put_client(struct rbd_client *rbdc)
 997{
 998        if (rbdc)
 999                kref_put(&rbdc->kref, rbd_client_release);
1000}
1001
1002/*
1003 * Get a ceph client with specific addr and configuration, if one does
1004 * not exist create it.  Either way, ceph_opts is consumed by this
1005 * function.
1006 */
1007static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1008{
1009        struct rbd_client *rbdc;
1010        int ret;
1011
1012        mutex_lock(&client_mutex);
1013        rbdc = rbd_client_find(ceph_opts);
1014        if (rbdc) {
1015                ceph_destroy_options(ceph_opts);
1016
1017                /*
1018                 * Using an existing client.  Make sure ->pg_pools is up to
1019                 * date before we look up the pool id in do_rbd_add().
1020                 */
1021                ret = ceph_wait_for_latest_osdmap(rbdc->client,
1022                                        rbdc->client->options->mount_timeout);
1023                if (ret) {
1024                        rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1025                        rbd_put_client(rbdc);
1026                        rbdc = ERR_PTR(ret);
1027                }
1028        } else {
1029                rbdc = rbd_client_create(ceph_opts);
1030        }
1031        mutex_unlock(&client_mutex);
1032
1033        return rbdc;
1034}
1035
1036static bool rbd_image_format_valid(u32 image_format)
1037{
1038        return image_format == 1 || image_format == 2;
1039}
1040
1041static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1042{
1043        size_t size;
1044        u32 snap_count;
1045
1046        /* The header has to start with the magic rbd header text */
1047        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1048                return false;
1049
1050        /* The bio layer requires at least sector-sized I/O */
1051
1052        if (ondisk->options.order < SECTOR_SHIFT)
1053                return false;
1054
1055        /* If we use u64 in a few spots we may be able to loosen this */
1056
1057        if (ondisk->options.order > 8 * sizeof (int) - 1)
1058                return false;
1059
1060        /*
1061         * The size of a snapshot header has to fit in a size_t, and
1062         * that limits the number of snapshots.
1063         */
1064        snap_count = le32_to_cpu(ondisk->snap_count);
1065        size = SIZE_MAX - sizeof (struct ceph_snap_context);
1066        if (snap_count > size / sizeof (__le64))
1067                return false;
1068
1069        /*
1070         * Not only that, but the size of the entire the snapshot
1071         * header must also be representable in a size_t.
1072         */
1073        size -= snap_count * sizeof (__le64);
1074        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1075                return false;
1076
1077        return true;
1078}
1079
1080/*
1081 * returns the size of an object in the image
1082 */
1083static u32 rbd_obj_bytes(struct rbd_image_header *header)
1084{
1085        return 1U << header->obj_order;
1086}
1087
1088static void rbd_init_layout(struct rbd_device *rbd_dev)
1089{
1090        if (rbd_dev->header.stripe_unit == 0 ||
1091            rbd_dev->header.stripe_count == 0) {
1092                rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1093                rbd_dev->header.stripe_count = 1;
1094        }
1095
1096        rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1097        rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1098        rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1099        rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1100                          rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1101        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1102}
1103
1104/*
1105 * Fill an rbd image header with information from the given format 1
1106 * on-disk header.
1107 */
1108static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1109                                 struct rbd_image_header_ondisk *ondisk)
1110{
1111        struct rbd_image_header *header = &rbd_dev->header;
1112        bool first_time = header->object_prefix == NULL;
1113        struct ceph_snap_context *snapc;
1114        char *object_prefix = NULL;
1115        char *snap_names = NULL;
1116        u64 *snap_sizes = NULL;
1117        u32 snap_count;
1118        int ret = -ENOMEM;
1119        u32 i;
1120
1121        /* Allocate this now to avoid having to handle failure below */
1122
1123        if (first_time) {
1124                object_prefix = kstrndup(ondisk->object_prefix,
1125                                         sizeof(ondisk->object_prefix),
1126                                         GFP_KERNEL);
1127                if (!object_prefix)
1128                        return -ENOMEM;
1129        }
1130
1131        /* Allocate the snapshot context and fill it in */
1132
1133        snap_count = le32_to_cpu(ondisk->snap_count);
1134        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1135        if (!snapc)
1136                goto out_err;
1137        snapc->seq = le64_to_cpu(ondisk->snap_seq);
1138        if (snap_count) {
1139                struct rbd_image_snap_ondisk *snaps;
1140                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1141
1142                /* We'll keep a copy of the snapshot names... */
1143
1144                if (snap_names_len > (u64)SIZE_MAX)
1145                        goto out_2big;
1146                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1147                if (!snap_names)
1148                        goto out_err;
1149
1150                /* ...as well as the array of their sizes. */
1151                snap_sizes = kmalloc_array(snap_count,
1152                                           sizeof(*header->snap_sizes),
1153                                           GFP_KERNEL);
1154                if (!snap_sizes)
1155                        goto out_err;
1156
1157                /*
1158                 * Copy the names, and fill in each snapshot's id
1159                 * and size.
1160                 *
1161                 * Note that rbd_dev_v1_header_info() guarantees the
1162                 * ondisk buffer we're working with has
1163                 * snap_names_len bytes beyond the end of the
1164                 * snapshot id array, this memcpy() is safe.
1165                 */
1166                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1167                snaps = ondisk->snaps;
1168                for (i = 0; i < snap_count; i++) {
1169                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1170                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1171                }
1172        }
1173
1174        /* We won't fail any more, fill in the header */
1175
1176        if (first_time) {
1177                header->object_prefix = object_prefix;
1178                header->obj_order = ondisk->options.order;
1179                rbd_init_layout(rbd_dev);
1180        } else {
1181                ceph_put_snap_context(header->snapc);
1182                kfree(header->snap_names);
1183                kfree(header->snap_sizes);
1184        }
1185
1186        /* The remaining fields always get updated (when we refresh) */
1187
1188        header->image_size = le64_to_cpu(ondisk->image_size);
1189        header->snapc = snapc;
1190        header->snap_names = snap_names;
1191        header->snap_sizes = snap_sizes;
1192
1193        return 0;
1194out_2big:
1195        ret = -EIO;
1196out_err:
1197        kfree(snap_sizes);
1198        kfree(snap_names);
1199        ceph_put_snap_context(snapc);
1200        kfree(object_prefix);
1201
1202        return ret;
1203}
1204
1205static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1206{
1207        const char *snap_name;
1208
1209        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1210
1211        /* Skip over names until we find the one we are looking for */
1212
1213        snap_name = rbd_dev->header.snap_names;
1214        while (which--)
1215                snap_name += strlen(snap_name) + 1;
1216
1217        return kstrdup(snap_name, GFP_KERNEL);
1218}
1219
1220/*
1221 * Snapshot id comparison function for use with qsort()/bsearch().
1222 * Note that result is for snapshots in *descending* order.
1223 */
1224static int snapid_compare_reverse(const void *s1, const void *s2)
1225{
1226        u64 snap_id1 = *(u64 *)s1;
1227        u64 snap_id2 = *(u64 *)s2;
1228
1229        if (snap_id1 < snap_id2)
1230                return 1;
1231        return snap_id1 == snap_id2 ? 0 : -1;
1232}
1233
1234/*
1235 * Search a snapshot context to see if the given snapshot id is
1236 * present.
1237 *
1238 * Returns the position of the snapshot id in the array if it's found,
1239 * or BAD_SNAP_INDEX otherwise.
1240 *
1241 * Note: The snapshot array is in kept sorted (by the osd) in
1242 * reverse order, highest snapshot id first.
1243 */
1244static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1245{
1246        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1247        u64 *found;
1248
1249        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1250                                sizeof (snap_id), snapid_compare_reverse);
1251
1252        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1253}
1254
1255static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1256                                        u64 snap_id)
1257{
1258        u32 which;
1259        const char *snap_name;
1260
1261        which = rbd_dev_snap_index(rbd_dev, snap_id);
1262        if (which == BAD_SNAP_INDEX)
1263                return ERR_PTR(-ENOENT);
1264
1265        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1266        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1267}
1268
1269static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1270{
1271        if (snap_id == CEPH_NOSNAP)
1272                return RBD_SNAP_HEAD_NAME;
1273
1274        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1275        if (rbd_dev->image_format == 1)
1276                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1277
1278        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1279}
1280
1281static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1282                                u64 *snap_size)
1283{
1284        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1285        if (snap_id == CEPH_NOSNAP) {
1286                *snap_size = rbd_dev->header.image_size;
1287        } else if (rbd_dev->image_format == 1) {
1288                u32 which;
1289
1290                which = rbd_dev_snap_index(rbd_dev, snap_id);
1291                if (which == BAD_SNAP_INDEX)
1292                        return -ENOENT;
1293
1294                *snap_size = rbd_dev->header.snap_sizes[which];
1295        } else {
1296                u64 size = 0;
1297                int ret;
1298
1299                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1300                if (ret)
1301                        return ret;
1302
1303                *snap_size = size;
1304        }
1305        return 0;
1306}
1307
1308static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1309{
1310        u64 snap_id = rbd_dev->spec->snap_id;
1311        u64 size = 0;
1312        int ret;
1313
1314        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1315        if (ret)
1316                return ret;
1317
1318        rbd_dev->mapping.size = size;
1319        return 0;
1320}
1321
1322static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1323{
1324        rbd_dev->mapping.size = 0;
1325}
1326
1327static void zero_bvec(struct bio_vec *bv)
1328{
1329        void *buf;
1330        unsigned long flags;
1331
1332        buf = bvec_kmap_irq(bv, &flags);
1333        memset(buf, 0, bv->bv_len);
1334        flush_dcache_page(bv->bv_page);
1335        bvec_kunmap_irq(buf, &flags);
1336}
1337
1338static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1339{
1340        struct ceph_bio_iter it = *bio_pos;
1341
1342        ceph_bio_iter_advance(&it, off);
1343        ceph_bio_iter_advance_step(&it, bytes, ({
1344                zero_bvec(&bv);
1345        }));
1346}
1347
1348static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1349{
1350        struct ceph_bvec_iter it = *bvec_pos;
1351
1352        ceph_bvec_iter_advance(&it, off);
1353        ceph_bvec_iter_advance_step(&it, bytes, ({
1354                zero_bvec(&bv);
1355        }));
1356}
1357
1358/*
1359 * Zero a range in @obj_req data buffer defined by a bio (list) or
1360 * (private) bio_vec array.
1361 *
1362 * @off is relative to the start of the data buffer.
1363 */
1364static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1365                               u32 bytes)
1366{
1367        dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1368
1369        switch (obj_req->img_request->data_type) {
1370        case OBJ_REQUEST_BIO:
1371                zero_bios(&obj_req->bio_pos, off, bytes);
1372                break;
1373        case OBJ_REQUEST_BVECS:
1374        case OBJ_REQUEST_OWN_BVECS:
1375                zero_bvecs(&obj_req->bvec_pos, off, bytes);
1376                break;
1377        default:
1378                BUG();
1379        }
1380}
1381
1382static void rbd_obj_request_destroy(struct kref *kref);
1383static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1384{
1385        rbd_assert(obj_request != NULL);
1386        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1387                kref_read(&obj_request->kref));
1388        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1389}
1390
1391static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1392                                        struct rbd_obj_request *obj_request)
1393{
1394        rbd_assert(obj_request->img_request == NULL);
1395
1396        /* Image request now owns object's original reference */
1397        obj_request->img_request = img_request;
1398        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1399}
1400
1401static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1402                                        struct rbd_obj_request *obj_request)
1403{
1404        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1405        list_del(&obj_request->ex.oe_item);
1406        rbd_assert(obj_request->img_request == img_request);
1407        rbd_obj_request_put(obj_request);
1408}
1409
1410static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1411{
1412        struct rbd_obj_request *obj_req = osd_req->r_priv;
1413
1414        dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1415             __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1416             obj_req->ex.oe_off, obj_req->ex.oe_len);
1417        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1418}
1419
1420/*
1421 * The default/initial value for all image request flags is 0.  Each
1422 * is conditionally set to 1 at image request initialization time
1423 * and currently never change thereafter.
1424 */
1425static void img_request_layered_set(struct rbd_img_request *img_request)
1426{
1427        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1428}
1429
1430static bool img_request_layered_test(struct rbd_img_request *img_request)
1431{
1432        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1433}
1434
1435static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1436{
1437        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1438
1439        return !obj_req->ex.oe_off &&
1440               obj_req->ex.oe_len == rbd_dev->layout.object_size;
1441}
1442
1443static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1444{
1445        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1446
1447        return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1448                                        rbd_dev->layout.object_size;
1449}
1450
1451/*
1452 * Must be called after rbd_obj_calc_img_extents().
1453 */
1454static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1455{
1456        if (!obj_req->num_img_extents ||
1457            (rbd_obj_is_entire(obj_req) &&
1458             !obj_req->img_request->snapc->num_snaps))
1459                return false;
1460
1461        return true;
1462}
1463
1464static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1465{
1466        return ceph_file_extents_bytes(obj_req->img_extents,
1467                                       obj_req->num_img_extents);
1468}
1469
1470static bool rbd_img_is_write(struct rbd_img_request *img_req)
1471{
1472        switch (img_req->op_type) {
1473        case OBJ_OP_READ:
1474                return false;
1475        case OBJ_OP_WRITE:
1476        case OBJ_OP_DISCARD:
1477        case OBJ_OP_ZEROOUT:
1478                return true;
1479        default:
1480                BUG();
1481        }
1482}
1483
1484static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1485{
1486        struct rbd_obj_request *obj_req = osd_req->r_priv;
1487        int result;
1488
1489        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1490             osd_req->r_result, obj_req);
1491
1492        /*
1493         * Writes aren't allowed to return a data payload.  In some
1494         * guarded write cases (e.g. stat + zero on an empty object)
1495         * a stat response makes it through, but we don't care.
1496         */
1497        if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1498                result = 0;
1499        else
1500                result = osd_req->r_result;
1501
1502        rbd_obj_handle_request(obj_req, result);
1503}
1504
1505static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1506{
1507        struct rbd_obj_request *obj_request = osd_req->r_priv;
1508        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1509        struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1510
1511        osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1512        osd_req->r_snapid = obj_request->img_request->snap_id;
1513}
1514
1515static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1516{
1517        struct rbd_obj_request *obj_request = osd_req->r_priv;
1518
1519        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1520        ktime_get_real_ts64(&osd_req->r_mtime);
1521        osd_req->r_data_offset = obj_request->ex.oe_off;
1522}
1523
1524static struct ceph_osd_request *
1525__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1526                          struct ceph_snap_context *snapc, int num_ops)
1527{
1528        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1529        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1530        struct ceph_osd_request *req;
1531        const char *name_format = rbd_dev->image_format == 1 ?
1532                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1533        int ret;
1534
1535        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1536        if (!req)
1537                return ERR_PTR(-ENOMEM);
1538
1539        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1540        req->r_callback = rbd_osd_req_callback;
1541        req->r_priv = obj_req;
1542
1543        /*
1544         * Data objects may be stored in a separate pool, but always in
1545         * the same namespace in that pool as the header in its pool.
1546         */
1547        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1548        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1549
1550        ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1551                               rbd_dev->header.object_prefix,
1552                               obj_req->ex.oe_objno);
1553        if (ret)
1554                return ERR_PTR(ret);
1555
1556        return req;
1557}
1558
1559static struct ceph_osd_request *
1560rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1561{
1562        return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1563                                         num_ops);
1564}
1565
1566static struct rbd_obj_request *rbd_obj_request_create(void)
1567{
1568        struct rbd_obj_request *obj_request;
1569
1570        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1571        if (!obj_request)
1572                return NULL;
1573
1574        ceph_object_extent_init(&obj_request->ex);
1575        INIT_LIST_HEAD(&obj_request->osd_reqs);
1576        mutex_init(&obj_request->state_mutex);
1577        kref_init(&obj_request->kref);
1578
1579        dout("%s %p\n", __func__, obj_request);
1580        return obj_request;
1581}
1582
1583static void rbd_obj_request_destroy(struct kref *kref)
1584{
1585        struct rbd_obj_request *obj_request;
1586        struct ceph_osd_request *osd_req;
1587        u32 i;
1588
1589        obj_request = container_of(kref, struct rbd_obj_request, kref);
1590
1591        dout("%s: obj %p\n", __func__, obj_request);
1592
1593        while (!list_empty(&obj_request->osd_reqs)) {
1594                osd_req = list_first_entry(&obj_request->osd_reqs,
1595                                    struct ceph_osd_request, r_private_item);
1596                list_del_init(&osd_req->r_private_item);
1597                ceph_osdc_put_request(osd_req);
1598        }
1599
1600        switch (obj_request->img_request->data_type) {
1601        case OBJ_REQUEST_NODATA:
1602        case OBJ_REQUEST_BIO:
1603        case OBJ_REQUEST_BVECS:
1604                break;          /* Nothing to do */
1605        case OBJ_REQUEST_OWN_BVECS:
1606                kfree(obj_request->bvec_pos.bvecs);
1607                break;
1608        default:
1609                BUG();
1610        }
1611
1612        kfree(obj_request->img_extents);
1613        if (obj_request->copyup_bvecs) {
1614                for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1615                        if (obj_request->copyup_bvecs[i].bv_page)
1616                                __free_page(obj_request->copyup_bvecs[i].bv_page);
1617                }
1618                kfree(obj_request->copyup_bvecs);
1619        }
1620
1621        kmem_cache_free(rbd_obj_request_cache, obj_request);
1622}
1623
1624/* It's OK to call this for a device with no parent */
1625
1626static void rbd_spec_put(struct rbd_spec *spec);
1627static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1628{
1629        rbd_dev_remove_parent(rbd_dev);
1630        rbd_spec_put(rbd_dev->parent_spec);
1631        rbd_dev->parent_spec = NULL;
1632        rbd_dev->parent_overlap = 0;
1633}
1634
1635/*
1636 * Parent image reference counting is used to determine when an
1637 * image's parent fields can be safely torn down--after there are no
1638 * more in-flight requests to the parent image.  When the last
1639 * reference is dropped, cleaning them up is safe.
1640 */
1641static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1642{
1643        int counter;
1644
1645        if (!rbd_dev->parent_spec)
1646                return;
1647
1648        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1649        if (counter > 0)
1650                return;
1651
1652        /* Last reference; clean up parent data structures */
1653
1654        if (!counter)
1655                rbd_dev_unparent(rbd_dev);
1656        else
1657                rbd_warn(rbd_dev, "parent reference underflow");
1658}
1659
1660/*
1661 * If an image has a non-zero parent overlap, get a reference to its
1662 * parent.
1663 *
1664 * Returns true if the rbd device has a parent with a non-zero
1665 * overlap and a reference for it was successfully taken, or
1666 * false otherwise.
1667 */
1668static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1669{
1670        int counter = 0;
1671
1672        if (!rbd_dev->parent_spec)
1673                return false;
1674
1675        if (rbd_dev->parent_overlap)
1676                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1677
1678        if (counter < 0)
1679                rbd_warn(rbd_dev, "parent reference overflow");
1680
1681        return counter > 0;
1682}
1683
1684static void rbd_img_request_init(struct rbd_img_request *img_request,
1685                                 struct rbd_device *rbd_dev,
1686                                 enum obj_operation_type op_type)
1687{
1688        memset(img_request, 0, sizeof(*img_request));
1689
1690        img_request->rbd_dev = rbd_dev;
1691        img_request->op_type = op_type;
1692
1693        INIT_LIST_HEAD(&img_request->lock_item);
1694        INIT_LIST_HEAD(&img_request->object_extents);
1695        mutex_init(&img_request->state_mutex);
1696}
1697
1698static void rbd_img_capture_header(struct rbd_img_request *img_req)
1699{
1700        struct rbd_device *rbd_dev = img_req->rbd_dev;
1701
1702        lockdep_assert_held(&rbd_dev->header_rwsem);
1703
1704        if (rbd_img_is_write(img_req))
1705                img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1706        else
1707                img_req->snap_id = rbd_dev->spec->snap_id;
1708
1709        if (rbd_dev_parent_get(rbd_dev))
1710                img_request_layered_set(img_req);
1711}
1712
1713static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1714{
1715        struct rbd_obj_request *obj_request;
1716        struct rbd_obj_request *next_obj_request;
1717
1718        dout("%s: img %p\n", __func__, img_request);
1719
1720        WARN_ON(!list_empty(&img_request->lock_item));
1721        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1722                rbd_img_obj_request_del(img_request, obj_request);
1723
1724        if (img_request_layered_test(img_request))
1725                rbd_dev_parent_put(img_request->rbd_dev);
1726
1727        if (rbd_img_is_write(img_request))
1728                ceph_put_snap_context(img_request->snapc);
1729
1730        if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1731                kmem_cache_free(rbd_img_request_cache, img_request);
1732}
1733
1734#define BITS_PER_OBJ    2
1735#define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1736#define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1737
1738static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1739                                   u64 *index, u8 *shift)
1740{
1741        u32 off;
1742
1743        rbd_assert(objno < rbd_dev->object_map_size);
1744        *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1745        *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1746}
1747
1748static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1749{
1750        u64 index;
1751        u8 shift;
1752
1753        lockdep_assert_held(&rbd_dev->object_map_lock);
1754        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1755        return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1756}
1757
1758static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1759{
1760        u64 index;
1761        u8 shift;
1762        u8 *p;
1763
1764        lockdep_assert_held(&rbd_dev->object_map_lock);
1765        rbd_assert(!(val & ~OBJ_MASK));
1766
1767        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1768        p = &rbd_dev->object_map[index];
1769        *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1770}
1771
1772static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1773{
1774        u8 state;
1775
1776        spin_lock(&rbd_dev->object_map_lock);
1777        state = __rbd_object_map_get(rbd_dev, objno);
1778        spin_unlock(&rbd_dev->object_map_lock);
1779        return state;
1780}
1781
1782static bool use_object_map(struct rbd_device *rbd_dev)
1783{
1784        /*
1785         * An image mapped read-only can't use the object map -- it isn't
1786         * loaded because the header lock isn't acquired.  Someone else can
1787         * write to the image and update the object map behind our back.
1788         *
1789         * A snapshot can't be written to, so using the object map is always
1790         * safe.
1791         */
1792        if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1793                return false;
1794
1795        return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1796                !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1797}
1798
1799static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1800{
1801        u8 state;
1802
1803        /* fall back to default logic if object map is disabled or invalid */
1804        if (!use_object_map(rbd_dev))
1805                return true;
1806
1807        state = rbd_object_map_get(rbd_dev, objno);
1808        return state != OBJECT_NONEXISTENT;
1809}
1810
1811static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1812                                struct ceph_object_id *oid)
1813{
1814        if (snap_id == CEPH_NOSNAP)
1815                ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1816                                rbd_dev->spec->image_id);
1817        else
1818                ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1819                                rbd_dev->spec->image_id, snap_id);
1820}
1821
1822static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1823{
1824        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825        CEPH_DEFINE_OID_ONSTACK(oid);
1826        u8 lock_type;
1827        char *lock_tag;
1828        struct ceph_locker *lockers;
1829        u32 num_lockers;
1830        bool broke_lock = false;
1831        int ret;
1832
1833        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1834
1835again:
1836        ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1837                            CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1838        if (ret != -EBUSY || broke_lock) {
1839                if (ret == -EEXIST)
1840                        ret = 0; /* already locked by myself */
1841                if (ret)
1842                        rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1843                return ret;
1844        }
1845
1846        ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1847                                 RBD_LOCK_NAME, &lock_type, &lock_tag,
1848                                 &lockers, &num_lockers);
1849        if (ret) {
1850                if (ret == -ENOENT)
1851                        goto again;
1852
1853                rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1854                return ret;
1855        }
1856
1857        kfree(lock_tag);
1858        if (num_lockers == 0)
1859                goto again;
1860
1861        rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1862                 ENTITY_NAME(lockers[0].id.name));
1863
1864        ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1865                                  RBD_LOCK_NAME, lockers[0].id.cookie,
1866                                  &lockers[0].id.name);
1867        ceph_free_lockers(lockers, num_lockers);
1868        if (ret) {
1869                if (ret == -ENOENT)
1870                        goto again;
1871
1872                rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1873                return ret;
1874        }
1875
1876        broke_lock = true;
1877        goto again;
1878}
1879
1880static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1881{
1882        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1883        CEPH_DEFINE_OID_ONSTACK(oid);
1884        int ret;
1885
1886        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1887
1888        ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1889                              "");
1890        if (ret && ret != -ENOENT)
1891                rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1892}
1893
1894static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1895{
1896        u8 struct_v;
1897        u32 struct_len;
1898        u32 header_len;
1899        void *header_end;
1900        int ret;
1901
1902        ceph_decode_32_safe(p, end, header_len, e_inval);
1903        header_end = *p + header_len;
1904
1905        ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1906                                  &struct_len);
1907        if (ret)
1908                return ret;
1909
1910        ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1911
1912        *p = header_end;
1913        return 0;
1914
1915e_inval:
1916        return -EINVAL;
1917}
1918
1919static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1920{
1921        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1922        CEPH_DEFINE_OID_ONSTACK(oid);
1923        struct page **pages;
1924        void *p, *end;
1925        size_t reply_len;
1926        u64 num_objects;
1927        u64 object_map_bytes;
1928        u64 object_map_size;
1929        int num_pages;
1930        int ret;
1931
1932        rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1933
1934        num_objects = ceph_get_num_objects(&rbd_dev->layout,
1935                                           rbd_dev->mapping.size);
1936        object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1937                                            BITS_PER_BYTE);
1938        num_pages = calc_pages_for(0, object_map_bytes) + 1;
1939        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1940        if (IS_ERR(pages))
1941                return PTR_ERR(pages);
1942
1943        reply_len = num_pages * PAGE_SIZE;
1944        rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1945        ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1946                             "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1947                             NULL, 0, pages, &reply_len);
1948        if (ret)
1949                goto out;
1950
1951        p = page_address(pages[0]);
1952        end = p + min(reply_len, (size_t)PAGE_SIZE);
1953        ret = decode_object_map_header(&p, end, &object_map_size);
1954        if (ret)
1955                goto out;
1956
1957        if (object_map_size != num_objects) {
1958                rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1959                         object_map_size, num_objects);
1960                ret = -EINVAL;
1961                goto out;
1962        }
1963
1964        if (offset_in_page(p) + object_map_bytes > reply_len) {
1965                ret = -EINVAL;
1966                goto out;
1967        }
1968
1969        rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1970        if (!rbd_dev->object_map) {
1971                ret = -ENOMEM;
1972                goto out;
1973        }
1974
1975        rbd_dev->object_map_size = object_map_size;
1976        ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1977                                   offset_in_page(p), object_map_bytes);
1978
1979out:
1980        ceph_release_page_vector(pages, num_pages);
1981        return ret;
1982}
1983
1984static void rbd_object_map_free(struct rbd_device *rbd_dev)
1985{
1986        kvfree(rbd_dev->object_map);
1987        rbd_dev->object_map = NULL;
1988        rbd_dev->object_map_size = 0;
1989}
1990
1991static int rbd_object_map_load(struct rbd_device *rbd_dev)
1992{
1993        int ret;
1994
1995        ret = __rbd_object_map_load(rbd_dev);
1996        if (ret)
1997                return ret;
1998
1999        ret = rbd_dev_v2_get_flags(rbd_dev);
2000        if (ret) {
2001                rbd_object_map_free(rbd_dev);
2002                return ret;
2003        }
2004
2005        if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2006                rbd_warn(rbd_dev, "object map is invalid");
2007
2008        return 0;
2009}
2010
2011static int rbd_object_map_open(struct rbd_device *rbd_dev)
2012{
2013        int ret;
2014
2015        ret = rbd_object_map_lock(rbd_dev);
2016        if (ret)
2017                return ret;
2018
2019        ret = rbd_object_map_load(rbd_dev);
2020        if (ret) {
2021                rbd_object_map_unlock(rbd_dev);
2022                return ret;
2023        }
2024
2025        return 0;
2026}
2027
2028static void rbd_object_map_close(struct rbd_device *rbd_dev)
2029{
2030        rbd_object_map_free(rbd_dev);
2031        rbd_object_map_unlock(rbd_dev);
2032}
2033
2034/*
2035 * This function needs snap_id (or more precisely just something to
2036 * distinguish between HEAD and snapshot object maps), new_state and
2037 * current_state that were passed to rbd_object_map_update().
2038 *
2039 * To avoid allocating and stashing a context we piggyback on the OSD
2040 * request.  A HEAD update has two ops (assert_locked).  For new_state
2041 * and current_state we decode our own object_map_update op, encoded in
2042 * rbd_cls_object_map_update().
2043 */
2044static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2045                                        struct ceph_osd_request *osd_req)
2046{
2047        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2048        struct ceph_osd_data *osd_data;
2049        u64 objno;
2050        u8 state, new_state, uninitialized_var(current_state);
2051        bool has_current_state;
2052        void *p;
2053
2054        if (osd_req->r_result)
2055                return osd_req->r_result;
2056
2057        /*
2058         * Nothing to do for a snapshot object map.
2059         */
2060        if (osd_req->r_num_ops == 1)
2061                return 0;
2062
2063        /*
2064         * Update in-memory HEAD object map.
2065         */
2066        rbd_assert(osd_req->r_num_ops == 2);
2067        osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2068        rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2069
2070        p = page_address(osd_data->pages[0]);
2071        objno = ceph_decode_64(&p);
2072        rbd_assert(objno == obj_req->ex.oe_objno);
2073        rbd_assert(ceph_decode_64(&p) == objno + 1);
2074        new_state = ceph_decode_8(&p);
2075        has_current_state = ceph_decode_8(&p);
2076        if (has_current_state)
2077                current_state = ceph_decode_8(&p);
2078
2079        spin_lock(&rbd_dev->object_map_lock);
2080        state = __rbd_object_map_get(rbd_dev, objno);
2081        if (!has_current_state || current_state == state ||
2082            (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2083                __rbd_object_map_set(rbd_dev, objno, new_state);
2084        spin_unlock(&rbd_dev->object_map_lock);
2085
2086        return 0;
2087}
2088
2089static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2090{
2091        struct rbd_obj_request *obj_req = osd_req->r_priv;
2092        int result;
2093
2094        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2095             osd_req->r_result, obj_req);
2096
2097        result = rbd_object_map_update_finish(obj_req, osd_req);
2098        rbd_obj_handle_request(obj_req, result);
2099}
2100
2101static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2102{
2103        u8 state = rbd_object_map_get(rbd_dev, objno);
2104
2105        if (state == new_state ||
2106            (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2107            (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2108                return false;
2109
2110        return true;
2111}
2112
2113static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2114                                     int which, u64 objno, u8 new_state,
2115                                     const u8 *current_state)
2116{
2117        struct page **pages;
2118        void *p, *start;
2119        int ret;
2120
2121        ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2122        if (ret)
2123                return ret;
2124
2125        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2126        if (IS_ERR(pages))
2127                return PTR_ERR(pages);
2128
2129        p = start = page_address(pages[0]);
2130        ceph_encode_64(&p, objno);
2131        ceph_encode_64(&p, objno + 1);
2132        ceph_encode_8(&p, new_state);
2133        if (current_state) {
2134                ceph_encode_8(&p, 1);
2135                ceph_encode_8(&p, *current_state);
2136        } else {
2137                ceph_encode_8(&p, 0);
2138        }
2139
2140        osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2141                                          false, true);
2142        return 0;
2143}
2144
2145/*
2146 * Return:
2147 *   0 - object map update sent
2148 *   1 - object map update isn't needed
2149 *  <0 - error
2150 */
2151static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2152                                 u8 new_state, const u8 *current_state)
2153{
2154        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2155        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2156        struct ceph_osd_request *req;
2157        int num_ops = 1;
2158        int which = 0;
2159        int ret;
2160
2161        if (snap_id == CEPH_NOSNAP) {
2162                if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2163                        return 1;
2164
2165                num_ops++; /* assert_locked */
2166        }
2167
2168        req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2169        if (!req)
2170                return -ENOMEM;
2171
2172        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2173        req->r_callback = rbd_object_map_callback;
2174        req->r_priv = obj_req;
2175
2176        rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2177        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2178        req->r_flags = CEPH_OSD_FLAG_WRITE;
2179        ktime_get_real_ts64(&req->r_mtime);
2180
2181        if (snap_id == CEPH_NOSNAP) {
2182                /*
2183                 * Protect against possible race conditions during lock
2184                 * ownership transitions.
2185                 */
2186                ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2187                                             CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2188                if (ret)
2189                        return ret;
2190        }
2191
2192        ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2193                                        new_state, current_state);
2194        if (ret)
2195                return ret;
2196
2197        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2198        if (ret)
2199                return ret;
2200
2201        ceph_osdc_start_request(osdc, req, false);
2202        return 0;
2203}
2204
2205static void prune_extents(struct ceph_file_extent *img_extents,
2206                          u32 *num_img_extents, u64 overlap)
2207{
2208        u32 cnt = *num_img_extents;
2209
2210        /* drop extents completely beyond the overlap */
2211        while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2212                cnt--;
2213
2214        if (cnt) {
2215                struct ceph_file_extent *ex = &img_extents[cnt - 1];
2216
2217                /* trim final overlapping extent */
2218                if (ex->fe_off + ex->fe_len > overlap)
2219                        ex->fe_len = overlap - ex->fe_off;
2220        }
2221
2222        *num_img_extents = cnt;
2223}
2224
2225/*
2226 * Determine the byte range(s) covered by either just the object extent
2227 * or the entire object in the parent image.
2228 */
2229static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2230                                    bool entire)
2231{
2232        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2233        int ret;
2234
2235        if (!rbd_dev->parent_overlap)
2236                return 0;
2237
2238        ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2239                                  entire ? 0 : obj_req->ex.oe_off,
2240                                  entire ? rbd_dev->layout.object_size :
2241                                                        obj_req->ex.oe_len,
2242                                  &obj_req->img_extents,
2243                                  &obj_req->num_img_extents);
2244        if (ret)
2245                return ret;
2246
2247        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2248                      rbd_dev->parent_overlap);
2249        return 0;
2250}
2251
2252static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2253{
2254        struct rbd_obj_request *obj_req = osd_req->r_priv;
2255
2256        switch (obj_req->img_request->data_type) {
2257        case OBJ_REQUEST_BIO:
2258                osd_req_op_extent_osd_data_bio(osd_req, which,
2259                                               &obj_req->bio_pos,
2260                                               obj_req->ex.oe_len);
2261                break;
2262        case OBJ_REQUEST_BVECS:
2263        case OBJ_REQUEST_OWN_BVECS:
2264                rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2265                                                        obj_req->ex.oe_len);
2266                rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2267                osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2268                                                    &obj_req->bvec_pos);
2269                break;
2270        default:
2271                BUG();
2272        }
2273}
2274
2275static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2276{
2277        struct page **pages;
2278
2279        /*
2280         * The response data for a STAT call consists of:
2281         *     le64 length;
2282         *     struct {
2283         *         le32 tv_sec;
2284         *         le32 tv_nsec;
2285         *     } mtime;
2286         */
2287        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2288        if (IS_ERR(pages))
2289                return PTR_ERR(pages);
2290
2291        osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2292        osd_req_op_raw_data_in_pages(osd_req, which, pages,
2293                                     8 + sizeof(struct ceph_timespec),
2294                                     0, false, true);
2295        return 0;
2296}
2297
2298static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2299                                u32 bytes)
2300{
2301        struct rbd_obj_request *obj_req = osd_req->r_priv;
2302        int ret;
2303
2304        ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2305        if (ret)
2306                return ret;
2307
2308        osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2309                                          obj_req->copyup_bvec_count, bytes);
2310        return 0;
2311}
2312
2313static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2314{
2315        obj_req->read_state = RBD_OBJ_READ_START;
2316        return 0;
2317}
2318
2319static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2320                                      int which)
2321{
2322        struct rbd_obj_request *obj_req = osd_req->r_priv;
2323        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2324        u16 opcode;
2325
2326        if (!use_object_map(rbd_dev) ||
2327            !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2328                osd_req_op_alloc_hint_init(osd_req, which++,
2329                                           rbd_dev->layout.object_size,
2330                                           rbd_dev->layout.object_size,
2331                                           rbd_dev->opts->alloc_hint_flags);
2332        }
2333
2334        if (rbd_obj_is_entire(obj_req))
2335                opcode = CEPH_OSD_OP_WRITEFULL;
2336        else
2337                opcode = CEPH_OSD_OP_WRITE;
2338
2339        osd_req_op_extent_init(osd_req, which, opcode,
2340                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2341        rbd_osd_setup_data(osd_req, which);
2342}
2343
2344static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2345{
2346        int ret;
2347
2348        /* reverse map the entire object onto the parent */
2349        ret = rbd_obj_calc_img_extents(obj_req, true);
2350        if (ret)
2351                return ret;
2352
2353        if (rbd_obj_copyup_enabled(obj_req))
2354                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2355
2356        obj_req->write_state = RBD_OBJ_WRITE_START;
2357        return 0;
2358}
2359
2360static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2361{
2362        return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2363                                          CEPH_OSD_OP_ZERO;
2364}
2365
2366static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2367                                        int which)
2368{
2369        struct rbd_obj_request *obj_req = osd_req->r_priv;
2370
2371        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2372                rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2373                osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2374        } else {
2375                osd_req_op_extent_init(osd_req, which,
2376                                       truncate_or_zero_opcode(obj_req),
2377                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2378                                       0, 0);
2379        }
2380}
2381
2382static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2383{
2384        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2385        u64 off, next_off;
2386        int ret;
2387
2388        /*
2389         * Align the range to alloc_size boundary and punt on discards
2390         * that are too small to free up any space.
2391         *
2392         * alloc_size == object_size && is_tail() is a special case for
2393         * filestore with filestore_punch_hole = false, needed to allow
2394         * truncate (in addition to delete).
2395         */
2396        if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2397            !rbd_obj_is_tail(obj_req)) {
2398                off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2399                next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2400                                      rbd_dev->opts->alloc_size);
2401                if (off >= next_off)
2402                        return 1;
2403
2404                dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2405                     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2406                     off, next_off - off);
2407                obj_req->ex.oe_off = off;
2408                obj_req->ex.oe_len = next_off - off;
2409        }
2410
2411        /* reverse map the entire object onto the parent */
2412        ret = rbd_obj_calc_img_extents(obj_req, true);
2413        if (ret)
2414                return ret;
2415
2416        obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2417        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2418                obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2419
2420        obj_req->write_state = RBD_OBJ_WRITE_START;
2421        return 0;
2422}
2423
2424static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2425                                        int which)
2426{
2427        struct rbd_obj_request *obj_req = osd_req->r_priv;
2428        u16 opcode;
2429
2430        if (rbd_obj_is_entire(obj_req)) {
2431                if (obj_req->num_img_extents) {
2432                        if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2433                                osd_req_op_init(osd_req, which++,
2434                                                CEPH_OSD_OP_CREATE, 0);
2435                        opcode = CEPH_OSD_OP_TRUNCATE;
2436                } else {
2437                        rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2438                        osd_req_op_init(osd_req, which++,
2439                                        CEPH_OSD_OP_DELETE, 0);
2440                        opcode = 0;
2441                }
2442        } else {
2443                opcode = truncate_or_zero_opcode(obj_req);
2444        }
2445
2446        if (opcode)
2447                osd_req_op_extent_init(osd_req, which, opcode,
2448                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2449                                       0, 0);
2450}
2451
2452static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2453{
2454        int ret;
2455
2456        /* reverse map the entire object onto the parent */
2457        ret = rbd_obj_calc_img_extents(obj_req, true);
2458        if (ret)
2459                return ret;
2460
2461        if (rbd_obj_copyup_enabled(obj_req))
2462                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2463        if (!obj_req->num_img_extents) {
2464                obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2465                if (rbd_obj_is_entire(obj_req))
2466                        obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2467        }
2468
2469        obj_req->write_state = RBD_OBJ_WRITE_START;
2470        return 0;
2471}
2472
2473static int count_write_ops(struct rbd_obj_request *obj_req)
2474{
2475        struct rbd_img_request *img_req = obj_req->img_request;
2476
2477        switch (img_req->op_type) {
2478        case OBJ_OP_WRITE:
2479                if (!use_object_map(img_req->rbd_dev) ||
2480                    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2481                        return 2; /* setallochint + write/writefull */
2482
2483                return 1; /* write/writefull */
2484        case OBJ_OP_DISCARD:
2485                return 1; /* delete/truncate/zero */
2486        case OBJ_OP_ZEROOUT:
2487                if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2488                    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2489                        return 2; /* create + truncate */
2490
2491                return 1; /* delete/truncate/zero */
2492        default:
2493                BUG();
2494        }
2495}
2496
2497static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2498                                    int which)
2499{
2500        struct rbd_obj_request *obj_req = osd_req->r_priv;
2501
2502        switch (obj_req->img_request->op_type) {
2503        case OBJ_OP_WRITE:
2504                __rbd_osd_setup_write_ops(osd_req, which);
2505                break;
2506        case OBJ_OP_DISCARD:
2507                __rbd_osd_setup_discard_ops(osd_req, which);
2508                break;
2509        case OBJ_OP_ZEROOUT:
2510                __rbd_osd_setup_zeroout_ops(osd_req, which);
2511                break;
2512        default:
2513                BUG();
2514        }
2515}
2516
2517/*
2518 * Prune the list of object requests (adjust offset and/or length, drop
2519 * redundant requests).  Prepare object request state machines and image
2520 * request state machine for execution.
2521 */
2522static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2523{
2524        struct rbd_obj_request *obj_req, *next_obj_req;
2525        int ret;
2526
2527        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2528                switch (img_req->op_type) {
2529                case OBJ_OP_READ:
2530                        ret = rbd_obj_init_read(obj_req);
2531                        break;
2532                case OBJ_OP_WRITE:
2533                        ret = rbd_obj_init_write(obj_req);
2534                        break;
2535                case OBJ_OP_DISCARD:
2536                        ret = rbd_obj_init_discard(obj_req);
2537                        break;
2538                case OBJ_OP_ZEROOUT:
2539                        ret = rbd_obj_init_zeroout(obj_req);
2540                        break;
2541                default:
2542                        BUG();
2543                }
2544                if (ret < 0)
2545                        return ret;
2546                if (ret > 0) {
2547                        rbd_img_obj_request_del(img_req, obj_req);
2548                        continue;
2549                }
2550        }
2551
2552        img_req->state = RBD_IMG_START;
2553        return 0;
2554}
2555
2556union rbd_img_fill_iter {
2557        struct ceph_bio_iter    bio_iter;
2558        struct ceph_bvec_iter   bvec_iter;
2559};
2560
2561struct rbd_img_fill_ctx {
2562        enum obj_request_type   pos_type;
2563        union rbd_img_fill_iter *pos;
2564        union rbd_img_fill_iter iter;
2565        ceph_object_extent_fn_t set_pos_fn;
2566        ceph_object_extent_fn_t count_fn;
2567        ceph_object_extent_fn_t copy_fn;
2568};
2569
2570static struct ceph_object_extent *alloc_object_extent(void *arg)
2571{
2572        struct rbd_img_request *img_req = arg;
2573        struct rbd_obj_request *obj_req;
2574
2575        obj_req = rbd_obj_request_create();
2576        if (!obj_req)
2577                return NULL;
2578
2579        rbd_img_obj_request_add(img_req, obj_req);
2580        return &obj_req->ex;
2581}
2582
2583/*
2584 * While su != os && sc == 1 is technically not fancy (it's the same
2585 * layout as su == os && sc == 1), we can't use the nocopy path for it
2586 * because ->set_pos_fn() should be called only once per object.
2587 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2588 * treat su != os && sc == 1 as fancy.
2589 */
2590static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2591{
2592        return l->stripe_unit != l->object_size;
2593}
2594
2595static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2596                                       struct ceph_file_extent *img_extents,
2597                                       u32 num_img_extents,
2598                                       struct rbd_img_fill_ctx *fctx)
2599{
2600        u32 i;
2601        int ret;
2602
2603        img_req->data_type = fctx->pos_type;
2604
2605        /*
2606         * Create object requests and set each object request's starting
2607         * position in the provided bio (list) or bio_vec array.
2608         */
2609        fctx->iter = *fctx->pos;
2610        for (i = 0; i < num_img_extents; i++) {
2611                ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2612                                           img_extents[i].fe_off,
2613                                           img_extents[i].fe_len,
2614                                           &img_req->object_extents,
2615                                           alloc_object_extent, img_req,
2616                                           fctx->set_pos_fn, &fctx->iter);
2617                if (ret)
2618                        return ret;
2619        }
2620
2621        return __rbd_img_fill_request(img_req);
2622}
2623
2624/*
2625 * Map a list of image extents to a list of object extents, create the
2626 * corresponding object requests (normally each to a different object,
2627 * but not always) and add them to @img_req.  For each object request,
2628 * set up its data descriptor to point to the corresponding chunk(s) of
2629 * @fctx->pos data buffer.
2630 *
2631 * Because ceph_file_to_extents() will merge adjacent object extents
2632 * together, each object request's data descriptor may point to multiple
2633 * different chunks of @fctx->pos data buffer.
2634 *
2635 * @fctx->pos data buffer is assumed to be large enough.
2636 */
2637static int rbd_img_fill_request(struct rbd_img_request *img_req,
2638                                struct ceph_file_extent *img_extents,
2639                                u32 num_img_extents,
2640                                struct rbd_img_fill_ctx *fctx)
2641{
2642        struct rbd_device *rbd_dev = img_req->rbd_dev;
2643        struct rbd_obj_request *obj_req;
2644        u32 i;
2645        int ret;
2646
2647        if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2648            !rbd_layout_is_fancy(&rbd_dev->layout))
2649                return rbd_img_fill_request_nocopy(img_req, img_extents,
2650                                                   num_img_extents, fctx);
2651
2652        img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2653
2654        /*
2655         * Create object requests and determine ->bvec_count for each object
2656         * request.  Note that ->bvec_count sum over all object requests may
2657         * be greater than the number of bio_vecs in the provided bio (list)
2658         * or bio_vec array because when mapped, those bio_vecs can straddle
2659         * stripe unit boundaries.
2660         */
2661        fctx->iter = *fctx->pos;
2662        for (i = 0; i < num_img_extents; i++) {
2663                ret = ceph_file_to_extents(&rbd_dev->layout,
2664                                           img_extents[i].fe_off,
2665                                           img_extents[i].fe_len,
2666                                           &img_req->object_extents,
2667                                           alloc_object_extent, img_req,
2668                                           fctx->count_fn, &fctx->iter);
2669                if (ret)
2670                        return ret;
2671        }
2672
2673        for_each_obj_request(img_req, obj_req) {
2674                obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2675                                              sizeof(*obj_req->bvec_pos.bvecs),
2676                                              GFP_NOIO);
2677                if (!obj_req->bvec_pos.bvecs)
2678                        return -ENOMEM;
2679        }
2680
2681        /*
2682         * Fill in each object request's private bio_vec array, splitting and
2683         * rearranging the provided bio_vecs in stripe unit chunks as needed.
2684         */
2685        fctx->iter = *fctx->pos;
2686        for (i = 0; i < num_img_extents; i++) {
2687                ret = ceph_iterate_extents(&rbd_dev->layout,
2688                                           img_extents[i].fe_off,
2689                                           img_extents[i].fe_len,
2690                                           &img_req->object_extents,
2691                                           fctx->copy_fn, &fctx->iter);
2692                if (ret)
2693                        return ret;
2694        }
2695
2696        return __rbd_img_fill_request(img_req);
2697}
2698
2699static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2700                               u64 off, u64 len)
2701{
2702        struct ceph_file_extent ex = { off, len };
2703        union rbd_img_fill_iter dummy = {};
2704        struct rbd_img_fill_ctx fctx = {
2705                .pos_type = OBJ_REQUEST_NODATA,
2706                .pos = &dummy,
2707        };
2708
2709        return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2710}
2711
2712static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2713{
2714        struct rbd_obj_request *obj_req =
2715            container_of(ex, struct rbd_obj_request, ex);
2716        struct ceph_bio_iter *it = arg;
2717
2718        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2719        obj_req->bio_pos = *it;
2720        ceph_bio_iter_advance(it, bytes);
2721}
2722
2723static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2724{
2725        struct rbd_obj_request *obj_req =
2726            container_of(ex, struct rbd_obj_request, ex);
2727        struct ceph_bio_iter *it = arg;
2728
2729        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2730        ceph_bio_iter_advance_step(it, bytes, ({
2731                obj_req->bvec_count++;
2732        }));
2733
2734}
2735
2736static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2737{
2738        struct rbd_obj_request *obj_req =
2739            container_of(ex, struct rbd_obj_request, ex);
2740        struct ceph_bio_iter *it = arg;
2741
2742        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2743        ceph_bio_iter_advance_step(it, bytes, ({
2744                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2745                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2746        }));
2747}
2748
2749static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2750                                   struct ceph_file_extent *img_extents,
2751                                   u32 num_img_extents,
2752                                   struct ceph_bio_iter *bio_pos)
2753{
2754        struct rbd_img_fill_ctx fctx = {
2755                .pos_type = OBJ_REQUEST_BIO,
2756                .pos = (union rbd_img_fill_iter *)bio_pos,
2757                .set_pos_fn = set_bio_pos,
2758                .count_fn = count_bio_bvecs,
2759                .copy_fn = copy_bio_bvecs,
2760        };
2761
2762        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2763                                    &fctx);
2764}
2765
2766static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2767                                 u64 off, u64 len, struct bio *bio)
2768{
2769        struct ceph_file_extent ex = { off, len };
2770        struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2771
2772        return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2773}
2774
2775static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2776{
2777        struct rbd_obj_request *obj_req =
2778            container_of(ex, struct rbd_obj_request, ex);
2779        struct ceph_bvec_iter *it = arg;
2780
2781        obj_req->bvec_pos = *it;
2782        ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2783        ceph_bvec_iter_advance(it, bytes);
2784}
2785
2786static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2787{
2788        struct rbd_obj_request *obj_req =
2789            container_of(ex, struct rbd_obj_request, ex);
2790        struct ceph_bvec_iter *it = arg;
2791
2792        ceph_bvec_iter_advance_step(it, bytes, ({
2793                obj_req->bvec_count++;
2794        }));
2795}
2796
2797static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2798{
2799        struct rbd_obj_request *obj_req =
2800            container_of(ex, struct rbd_obj_request, ex);
2801        struct ceph_bvec_iter *it = arg;
2802
2803        ceph_bvec_iter_advance_step(it, bytes, ({
2804                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2805                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2806        }));
2807}
2808
2809static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2810                                     struct ceph_file_extent *img_extents,
2811                                     u32 num_img_extents,
2812                                     struct ceph_bvec_iter *bvec_pos)
2813{
2814        struct rbd_img_fill_ctx fctx = {
2815                .pos_type = OBJ_REQUEST_BVECS,
2816                .pos = (union rbd_img_fill_iter *)bvec_pos,
2817                .set_pos_fn = set_bvec_pos,
2818                .count_fn = count_bvecs,
2819                .copy_fn = copy_bvecs,
2820        };
2821
2822        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2823                                    &fctx);
2824}
2825
2826static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2827                                   struct ceph_file_extent *img_extents,
2828                                   u32 num_img_extents,
2829                                   struct bio_vec *bvecs)
2830{
2831        struct ceph_bvec_iter it = {
2832                .bvecs = bvecs,
2833                .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2834                                                             num_img_extents) },
2835        };
2836
2837        return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2838                                         &it);
2839}
2840
2841static void rbd_img_handle_request_work(struct work_struct *work)
2842{
2843        struct rbd_img_request *img_req =
2844            container_of(work, struct rbd_img_request, work);
2845
2846        rbd_img_handle_request(img_req, img_req->work_result);
2847}
2848
2849static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2850{
2851        INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2852        img_req->work_result = result;
2853        queue_work(rbd_wq, &img_req->work);
2854}
2855
2856static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2857{
2858        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2859
2860        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2861                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2862                return true;
2863        }
2864
2865        dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2866             obj_req->ex.oe_objno);
2867        return false;
2868}
2869
2870static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2871{
2872        struct ceph_osd_request *osd_req;
2873        int ret;
2874
2875        osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2876        if (IS_ERR(osd_req))
2877                return PTR_ERR(osd_req);
2878
2879        osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2880                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2881        rbd_osd_setup_data(osd_req, 0);
2882        rbd_osd_format_read(osd_req);
2883
2884        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2885        if (ret)
2886                return ret;
2887
2888        rbd_osd_submit(osd_req);
2889        return 0;
2890}
2891
2892static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2893{
2894        struct rbd_img_request *img_req = obj_req->img_request;
2895        struct rbd_device *parent = img_req->rbd_dev->parent;
2896        struct rbd_img_request *child_img_req;
2897        int ret;
2898
2899        child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2900        if (!child_img_req)
2901                return -ENOMEM;
2902
2903        rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2904        __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2905        child_img_req->obj_request = obj_req;
2906
2907        down_read(&parent->header_rwsem);
2908        rbd_img_capture_header(child_img_req);
2909        up_read(&parent->header_rwsem);
2910
2911        dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2912             obj_req);
2913
2914        if (!rbd_img_is_write(img_req)) {
2915                switch (img_req->data_type) {
2916                case OBJ_REQUEST_BIO:
2917                        ret = __rbd_img_fill_from_bio(child_img_req,
2918                                                      obj_req->img_extents,
2919                                                      obj_req->num_img_extents,
2920                                                      &obj_req->bio_pos);
2921                        break;
2922                case OBJ_REQUEST_BVECS:
2923                case OBJ_REQUEST_OWN_BVECS:
2924                        ret = __rbd_img_fill_from_bvecs(child_img_req,
2925                                                      obj_req->img_extents,
2926                                                      obj_req->num_img_extents,
2927                                                      &obj_req->bvec_pos);
2928                        break;
2929                default:
2930                        BUG();
2931                }
2932        } else {
2933                ret = rbd_img_fill_from_bvecs(child_img_req,
2934                                              obj_req->img_extents,
2935                                              obj_req->num_img_extents,
2936                                              obj_req->copyup_bvecs);
2937        }
2938        if (ret) {
2939                rbd_img_request_destroy(child_img_req);
2940                return ret;
2941        }
2942
2943        /* avoid parent chain recursion */
2944        rbd_img_schedule(child_img_req, 0);
2945        return 0;
2946}
2947
2948static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2949{
2950        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2951        int ret;
2952
2953again:
2954        switch (obj_req->read_state) {
2955        case RBD_OBJ_READ_START:
2956                rbd_assert(!*result);
2957
2958                if (!rbd_obj_may_exist(obj_req)) {
2959                        *result = -ENOENT;
2960                        obj_req->read_state = RBD_OBJ_READ_OBJECT;
2961                        goto again;
2962                }
2963
2964                ret = rbd_obj_read_object(obj_req);
2965                if (ret) {
2966                        *result = ret;
2967                        return true;
2968                }
2969                obj_req->read_state = RBD_OBJ_READ_OBJECT;
2970                return false;
2971        case RBD_OBJ_READ_OBJECT:
2972                if (*result == -ENOENT && rbd_dev->parent_overlap) {
2973                        /* reverse map this object extent onto the parent */
2974                        ret = rbd_obj_calc_img_extents(obj_req, false);
2975                        if (ret) {
2976                                *result = ret;
2977                                return true;
2978                        }
2979                        if (obj_req->num_img_extents) {
2980                                ret = rbd_obj_read_from_parent(obj_req);
2981                                if (ret) {
2982                                        *result = ret;
2983                                        return true;
2984                                }
2985                                obj_req->read_state = RBD_OBJ_READ_PARENT;
2986                                return false;
2987                        }
2988                }
2989
2990                /*
2991                 * -ENOENT means a hole in the image -- zero-fill the entire
2992                 * length of the request.  A short read also implies zero-fill
2993                 * to the end of the request.
2994                 */
2995                if (*result == -ENOENT) {
2996                        rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2997                        *result = 0;
2998                } else if (*result >= 0) {
2999                        if (*result < obj_req->ex.oe_len)
3000                                rbd_obj_zero_range(obj_req, *result,
3001                                                obj_req->ex.oe_len - *result);
3002                        else
3003                                rbd_assert(*result == obj_req->ex.oe_len);
3004                        *result = 0;
3005                }
3006                return true;
3007        case RBD_OBJ_READ_PARENT:
3008                /*
3009                 * The parent image is read only up to the overlap -- zero-fill
3010                 * from the overlap to the end of the request.
3011                 */
3012                if (!*result) {
3013                        u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3014
3015                        if (obj_overlap < obj_req->ex.oe_len)
3016                                rbd_obj_zero_range(obj_req, obj_overlap,
3017                                            obj_req->ex.oe_len - obj_overlap);
3018                }
3019                return true;
3020        default:
3021                BUG();
3022        }
3023}
3024
3025static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3026{
3027        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3028
3029        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3030                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3031
3032        if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3033            (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3034                dout("%s %p noop for nonexistent\n", __func__, obj_req);
3035                return true;
3036        }
3037
3038        return false;
3039}
3040
3041/*
3042 * Return:
3043 *   0 - object map update sent
3044 *   1 - object map update isn't needed
3045 *  <0 - error
3046 */
3047static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3048{
3049        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3050        u8 new_state;
3051
3052        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3053                return 1;
3054
3055        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3056                new_state = OBJECT_PENDING;
3057        else
3058                new_state = OBJECT_EXISTS;
3059
3060        return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3061}
3062
3063static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3064{
3065        struct ceph_osd_request *osd_req;
3066        int num_ops = count_write_ops(obj_req);
3067        int which = 0;
3068        int ret;
3069
3070        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3071                num_ops++; /* stat */
3072
3073        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3074        if (IS_ERR(osd_req))
3075                return PTR_ERR(osd_req);
3076
3077        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3078                ret = rbd_osd_setup_stat(osd_req, which++);
3079                if (ret)
3080                        return ret;
3081        }
3082
3083        rbd_osd_setup_write_ops(osd_req, which);
3084        rbd_osd_format_write(osd_req);
3085
3086        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3087        if (ret)
3088                return ret;
3089
3090        rbd_osd_submit(osd_req);
3091        return 0;
3092}
3093
3094/*
3095 * copyup_bvecs pages are never highmem pages
3096 */
3097static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3098{
3099        struct ceph_bvec_iter it = {
3100                .bvecs = bvecs,
3101                .iter = { .bi_size = bytes },
3102        };
3103
3104        ceph_bvec_iter_advance_step(&it, bytes, ({
3105                if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3106                               bv.bv_len))
3107                        return false;
3108        }));
3109        return true;
3110}
3111
3112#define MODS_ONLY       U32_MAX
3113
3114static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3115                                      u32 bytes)
3116{
3117        struct ceph_osd_request *osd_req;
3118        int ret;
3119
3120        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3121        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3122
3123        osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3124        if (IS_ERR(osd_req))
3125                return PTR_ERR(osd_req);
3126
3127        ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3128        if (ret)
3129                return ret;
3130
3131        rbd_osd_format_write(osd_req);
3132
3133        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3134        if (ret)
3135                return ret;
3136
3137        rbd_osd_submit(osd_req);
3138        return 0;
3139}
3140
3141static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3142                                        u32 bytes)
3143{
3144        struct ceph_osd_request *osd_req;
3145        int num_ops = count_write_ops(obj_req);
3146        int which = 0;
3147        int ret;
3148
3149        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3150
3151        if (bytes != MODS_ONLY)
3152                num_ops++; /* copyup */
3153
3154        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3155        if (IS_ERR(osd_req))
3156                return PTR_ERR(osd_req);
3157
3158        if (bytes != MODS_ONLY) {
3159                ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3160                if (ret)
3161                        return ret;
3162        }
3163
3164        rbd_osd_setup_write_ops(osd_req, which);
3165        rbd_osd_format_write(osd_req);
3166
3167        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3168        if (ret)
3169                return ret;
3170
3171        rbd_osd_submit(osd_req);
3172        return 0;
3173}
3174
3175static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3176{
3177        u32 i;
3178
3179        rbd_assert(!obj_req->copyup_bvecs);
3180        obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3181        obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3182                                        sizeof(*obj_req->copyup_bvecs),
3183                                        GFP_NOIO);
3184        if (!obj_req->copyup_bvecs)
3185                return -ENOMEM;
3186
3187        for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3188                unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3189
3190                obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3191                if (!obj_req->copyup_bvecs[i].bv_page)
3192                        return -ENOMEM;
3193
3194                obj_req->copyup_bvecs[i].bv_offset = 0;
3195                obj_req->copyup_bvecs[i].bv_len = len;
3196                obj_overlap -= len;
3197        }
3198
3199        rbd_assert(!obj_overlap);
3200        return 0;
3201}
3202
3203/*
3204 * The target object doesn't exist.  Read the data for the entire
3205 * target object up to the overlap point (if any) from the parent,
3206 * so we can use it for a copyup.
3207 */
3208static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3209{
3210        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3211        int ret;
3212
3213        rbd_assert(obj_req->num_img_extents);
3214        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3215                      rbd_dev->parent_overlap);
3216        if (!obj_req->num_img_extents) {
3217                /*
3218                 * The overlap has become 0 (most likely because the
3219                 * image has been flattened).  Re-submit the original write
3220                 * request -- pass MODS_ONLY since the copyup isn't needed
3221                 * anymore.
3222                 */
3223                return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3224        }
3225
3226        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3227        if (ret)
3228                return ret;
3229
3230        return rbd_obj_read_from_parent(obj_req);
3231}
3232
3233static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3234{
3235        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3236        struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3237        u8 new_state;
3238        u32 i;
3239        int ret;
3240
3241        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3242
3243        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3244                return;
3245
3246        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3247                return;
3248
3249        for (i = 0; i < snapc->num_snaps; i++) {
3250                if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3251                    i + 1 < snapc->num_snaps)
3252                        new_state = OBJECT_EXISTS_CLEAN;
3253                else
3254                        new_state = OBJECT_EXISTS;
3255
3256                ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3257                                            new_state, NULL);
3258                if (ret < 0) {
3259                        obj_req->pending.result = ret;
3260                        return;
3261                }
3262
3263                rbd_assert(!ret);
3264                obj_req->pending.num_pending++;
3265        }
3266}
3267
3268static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3269{
3270        u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3271        int ret;
3272
3273        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3274
3275        /*
3276         * Only send non-zero copyup data to save some I/O and network
3277         * bandwidth -- zero copyup data is equivalent to the object not
3278         * existing.
3279         */
3280        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3281                bytes = 0;
3282
3283        if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3284                /*
3285                 * Send a copyup request with an empty snapshot context to
3286                 * deep-copyup the object through all existing snapshots.
3287                 * A second request with the current snapshot context will be
3288                 * sent for the actual modification.
3289                 */
3290                ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3291                if (ret) {
3292                        obj_req->pending.result = ret;
3293                        return;
3294                }
3295
3296                obj_req->pending.num_pending++;
3297                bytes = MODS_ONLY;
3298        }
3299
3300        ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3301        if (ret) {
3302                obj_req->pending.result = ret;
3303                return;
3304        }
3305
3306        obj_req->pending.num_pending++;
3307}
3308
3309static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3310{
3311        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3312        int ret;
3313
3314again:
3315        switch (obj_req->copyup_state) {
3316        case RBD_OBJ_COPYUP_START:
3317                rbd_assert(!*result);
3318
3319                ret = rbd_obj_copyup_read_parent(obj_req);
3320                if (ret) {
3321                        *result = ret;
3322                        return true;
3323                }
3324                if (obj_req->num_img_extents)
3325                        obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3326                else
3327                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3328                return false;
3329        case RBD_OBJ_COPYUP_READ_PARENT:
3330                if (*result)
3331                        return true;
3332
3333                if (is_zero_bvecs(obj_req->copyup_bvecs,
3334                                  rbd_obj_img_extents_bytes(obj_req))) {
3335                        dout("%s %p detected zeros\n", __func__, obj_req);
3336                        obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3337                }
3338
3339                rbd_obj_copyup_object_maps(obj_req);
3340                if (!obj_req->pending.num_pending) {
3341                        *result = obj_req->pending.result;
3342                        obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3343                        goto again;
3344                }
3345                obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3346                return false;
3347        case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3348                if (!pending_result_dec(&obj_req->pending, result))
3349                        return false;
3350                /* fall through */
3351        case RBD_OBJ_COPYUP_OBJECT_MAPS:
3352                if (*result) {
3353                        rbd_warn(rbd_dev, "snap object map update failed: %d",
3354                                 *result);
3355                        return true;
3356                }
3357
3358                rbd_obj_copyup_write_object(obj_req);
3359                if (!obj_req->pending.num_pending) {
3360                        *result = obj_req->pending.result;
3361                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3362                        goto again;
3363                }
3364                obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3365                return false;
3366        case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3367                if (!pending_result_dec(&obj_req->pending, result))
3368                        return false;
3369                /* fall through */
3370        case RBD_OBJ_COPYUP_WRITE_OBJECT:
3371                return true;
3372        default:
3373                BUG();
3374        }
3375}
3376
3377/*
3378 * Return:
3379 *   0 - object map update sent
3380 *   1 - object map update isn't needed
3381 *  <0 - error
3382 */
3383static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3384{
3385        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3386        u8 current_state = OBJECT_PENDING;
3387
3388        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3389                return 1;
3390
3391        if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3392                return 1;
3393
3394        return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3395                                     &current_state);
3396}
3397
3398static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3399{
3400        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3401        int ret;
3402
3403again:
3404        switch (obj_req->write_state) {
3405        case RBD_OBJ_WRITE_START:
3406                rbd_assert(!*result);
3407
3408                if (rbd_obj_write_is_noop(obj_req))
3409                        return true;
3410
3411                ret = rbd_obj_write_pre_object_map(obj_req);
3412                if (ret < 0) {
3413                        *result = ret;
3414                        return true;
3415                }
3416                obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3417                if (ret > 0)
3418                        goto again;
3419                return false;
3420        case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3421                if (*result) {
3422                        rbd_warn(rbd_dev, "pre object map update failed: %d",
3423                                 *result);
3424                        return true;
3425                }
3426                ret = rbd_obj_write_object(obj_req);
3427                if (ret) {
3428                        *result = ret;
3429                        return true;
3430                }
3431                obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3432                return false;
3433        case RBD_OBJ_WRITE_OBJECT:
3434                if (*result == -ENOENT) {
3435                        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3436                                *result = 0;
3437                                obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3438                                obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3439                                goto again;
3440                        }
3441                        /*
3442                         * On a non-existent object:
3443                         *   delete - -ENOENT, truncate/zero - 0
3444                         */
3445                        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3446                                *result = 0;
3447                }
3448                if (*result)
3449                        return true;
3450
3451                obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3452                goto again;
3453        case __RBD_OBJ_WRITE_COPYUP:
3454                if (!rbd_obj_advance_copyup(obj_req, result))
3455                        return false;
3456                /* fall through */
3457        case RBD_OBJ_WRITE_COPYUP:
3458                if (*result) {
3459                        rbd_warn(rbd_dev, "copyup failed: %d", *result);
3460                        return true;
3461                }
3462                ret = rbd_obj_write_post_object_map(obj_req);
3463                if (ret < 0) {
3464                        *result = ret;
3465                        return true;
3466                }
3467                obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3468                if (ret > 0)
3469                        goto again;
3470                return false;
3471        case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3472                if (*result)
3473                        rbd_warn(rbd_dev, "post object map update failed: %d",
3474                                 *result);
3475                return true;
3476        default:
3477                BUG();
3478        }
3479}
3480
3481/*
3482 * Return true if @obj_req is completed.
3483 */
3484static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3485                                     int *result)
3486{
3487        struct rbd_img_request *img_req = obj_req->img_request;
3488        struct rbd_device *rbd_dev = img_req->rbd_dev;
3489        bool done;
3490
3491        mutex_lock(&obj_req->state_mutex);
3492        if (!rbd_img_is_write(img_req))
3493                done = rbd_obj_advance_read(obj_req, result);
3494        else
3495                done = rbd_obj_advance_write(obj_req, result);
3496        mutex_unlock(&obj_req->state_mutex);
3497
3498        if (done && *result) {
3499                rbd_assert(*result < 0);
3500                rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3501                         obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3502                         obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3503        }
3504        return done;
3505}
3506
3507/*
3508 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3509 * recursion.
3510 */
3511static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3512{
3513        if (__rbd_obj_handle_request(obj_req, &result))
3514                rbd_img_handle_request(obj_req->img_request, result);
3515}
3516
3517static bool need_exclusive_lock(struct rbd_img_request *img_req)
3518{
3519        struct rbd_device *rbd_dev = img_req->rbd_dev;
3520
3521        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3522                return false;
3523
3524        if (rbd_is_ro(rbd_dev))
3525                return false;
3526
3527        rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3528        if (rbd_dev->opts->lock_on_read ||
3529            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3530                return true;
3531
3532        return rbd_img_is_write(img_req);
3533}
3534
3535static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3536{
3537        struct rbd_device *rbd_dev = img_req->rbd_dev;
3538        bool locked;
3539
3540        lockdep_assert_held(&rbd_dev->lock_rwsem);
3541        locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3542        spin_lock(&rbd_dev->lock_lists_lock);
3543        rbd_assert(list_empty(&img_req->lock_item));
3544        if (!locked)
3545                list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3546        else
3547                list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3548        spin_unlock(&rbd_dev->lock_lists_lock);
3549        return locked;
3550}
3551
3552static void rbd_lock_del_request(struct rbd_img_request *img_req)
3553{
3554        struct rbd_device *rbd_dev = img_req->rbd_dev;
3555        bool need_wakeup;
3556
3557        lockdep_assert_held(&rbd_dev->lock_rwsem);
3558        spin_lock(&rbd_dev->lock_lists_lock);
3559        rbd_assert(!list_empty(&img_req->lock_item));
3560        list_del_init(&img_req->lock_item);
3561        need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3562                       list_empty(&rbd_dev->running_list));
3563        spin_unlock(&rbd_dev->lock_lists_lock);
3564        if (need_wakeup)
3565                complete(&rbd_dev->releasing_wait);
3566}
3567
3568static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3569{
3570        struct rbd_device *rbd_dev = img_req->rbd_dev;
3571
3572        if (!need_exclusive_lock(img_req))
3573                return 1;
3574
3575        if (rbd_lock_add_request(img_req))
3576                return 1;
3577
3578        if (rbd_dev->opts->exclusive) {
3579                WARN_ON(1); /* lock got released? */
3580                return -EROFS;
3581        }
3582
3583        /*
3584         * Note the use of mod_delayed_work() in rbd_acquire_lock()
3585         * and cancel_delayed_work() in wake_lock_waiters().
3586         */
3587        dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3588        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3589        return 0;
3590}
3591
3592static void rbd_img_object_requests(struct rbd_img_request *img_req)
3593{
3594        struct rbd_obj_request *obj_req;
3595
3596        rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3597
3598        for_each_obj_request(img_req, obj_req) {
3599                int result = 0;
3600
3601                if (__rbd_obj_handle_request(obj_req, &result)) {
3602                        if (result) {
3603                                img_req->pending.result = result;
3604                                return;
3605                        }
3606                } else {
3607                        img_req->pending.num_pending++;
3608                }
3609        }
3610}
3611
3612static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3613{
3614        struct rbd_device *rbd_dev = img_req->rbd_dev;
3615        int ret;
3616
3617again:
3618        switch (img_req->state) {
3619        case RBD_IMG_START:
3620                rbd_assert(!*result);
3621
3622                ret = rbd_img_exclusive_lock(img_req);
3623                if (ret < 0) {
3624                        *result = ret;
3625                        return true;
3626                }
3627                img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3628                if (ret > 0)
3629                        goto again;
3630                return false;
3631        case RBD_IMG_EXCLUSIVE_LOCK:
3632                if (*result)
3633                        return true;
3634
3635                rbd_assert(!need_exclusive_lock(img_req) ||
3636                           __rbd_is_lock_owner(rbd_dev));
3637
3638                rbd_img_object_requests(img_req);
3639                if (!img_req->pending.num_pending) {
3640                        *result = img_req->pending.result;
3641                        img_req->state = RBD_IMG_OBJECT_REQUESTS;
3642                        goto again;
3643                }
3644                img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3645                return false;
3646        case __RBD_IMG_OBJECT_REQUESTS:
3647                if (!pending_result_dec(&img_req->pending, result))
3648                        return false;
3649                /* fall through */
3650        case RBD_IMG_OBJECT_REQUESTS:
3651                return true;
3652        default:
3653                BUG();
3654        }
3655}
3656
3657/*
3658 * Return true if @img_req is completed.
3659 */
3660static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3661                                     int *result)
3662{
3663        struct rbd_device *rbd_dev = img_req->rbd_dev;
3664        bool done;
3665
3666        if (need_exclusive_lock(img_req)) {
3667                down_read(&rbd_dev->lock_rwsem);
3668                mutex_lock(&img_req->state_mutex);
3669                done = rbd_img_advance(img_req, result);
3670                if (done)
3671                        rbd_lock_del_request(img_req);
3672                mutex_unlock(&img_req->state_mutex);
3673                up_read(&rbd_dev->lock_rwsem);
3674        } else {
3675                mutex_lock(&img_req->state_mutex);
3676                done = rbd_img_advance(img_req, result);
3677                mutex_unlock(&img_req->state_mutex);
3678        }
3679
3680        if (done && *result) {
3681                rbd_assert(*result < 0);
3682                rbd_warn(rbd_dev, "%s%s result %d",
3683                      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3684                      obj_op_name(img_req->op_type), *result);
3685        }
3686        return done;
3687}
3688
3689static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3690{
3691again:
3692        if (!__rbd_img_handle_request(img_req, &result))
3693                return;
3694
3695        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3696                struct rbd_obj_request *obj_req = img_req->obj_request;
3697
3698                rbd_img_request_destroy(img_req);
3699                if (__rbd_obj_handle_request(obj_req, &result)) {
3700                        img_req = obj_req->img_request;
3701                        goto again;
3702                }
3703        } else {
3704                struct request *rq = blk_mq_rq_from_pdu(img_req);
3705
3706                rbd_img_request_destroy(img_req);
3707                blk_mq_end_request(rq, errno_to_blk_status(result));
3708        }
3709}
3710
3711static const struct rbd_client_id rbd_empty_cid;
3712
3713static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3714                          const struct rbd_client_id *rhs)
3715{
3716        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3717}
3718
3719static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3720{
3721        struct rbd_client_id cid;
3722
3723        mutex_lock(&rbd_dev->watch_mutex);
3724        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3725        cid.handle = rbd_dev->watch_cookie;
3726        mutex_unlock(&rbd_dev->watch_mutex);
3727        return cid;
3728}
3729
3730/*
3731 * lock_rwsem must be held for write
3732 */
3733static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3734                              const struct rbd_client_id *cid)
3735{
3736        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3737             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3738             cid->gid, cid->handle);
3739        rbd_dev->owner_cid = *cid; /* struct */
3740}
3741
3742static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3743{
3744        mutex_lock(&rbd_dev->watch_mutex);
3745        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3746        mutex_unlock(&rbd_dev->watch_mutex);
3747}
3748
3749static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3750{
3751        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3752
3753        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3754        strcpy(rbd_dev->lock_cookie, cookie);
3755        rbd_set_owner_cid(rbd_dev, &cid);
3756        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3757}
3758
3759/*
3760 * lock_rwsem must be held for write
3761 */
3762static int rbd_lock(struct rbd_device *rbd_dev)
3763{
3764        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3765        char cookie[32];
3766        int ret;
3767
3768        WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3769                rbd_dev->lock_cookie[0] != '\0');
3770
3771        format_lock_cookie(rbd_dev, cookie);
3772        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3773                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3774                            RBD_LOCK_TAG, "", 0);
3775        if (ret)
3776                return ret;
3777
3778        __rbd_lock(rbd_dev, cookie);
3779        return 0;
3780}
3781
3782/*
3783 * lock_rwsem must be held for write
3784 */
3785static void rbd_unlock(struct rbd_device *rbd_dev)
3786{
3787        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3788        int ret;
3789
3790        WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3791                rbd_dev->lock_cookie[0] == '\0');
3792
3793        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3794                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
3795        if (ret && ret != -ENOENT)
3796                rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3797
3798        /* treat errors as the image is unlocked */
3799        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3800        rbd_dev->lock_cookie[0] = '\0';
3801        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3802        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3803}
3804
3805static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3806                                enum rbd_notify_op notify_op,
3807                                struct page ***preply_pages,
3808                                size_t *preply_len)
3809{
3810        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3811        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3812        char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3813        int buf_size = sizeof(buf);
3814        void *p = buf;
3815
3816        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3817
3818        /* encode *LockPayload NotifyMessage (op + ClientId) */
3819        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3820        ceph_encode_32(&p, notify_op);
3821        ceph_encode_64(&p, cid.gid);
3822        ceph_encode_64(&p, cid.handle);
3823
3824        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3825                                &rbd_dev->header_oloc, buf, buf_size,
3826                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3827}
3828
3829static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3830                               enum rbd_notify_op notify_op)
3831{
3832        __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3833}
3834
3835static void rbd_notify_acquired_lock(struct work_struct *work)
3836{
3837        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3838                                                  acquired_lock_work);
3839
3840        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3841}
3842
3843static void rbd_notify_released_lock(struct work_struct *work)
3844{
3845        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3846                                                  released_lock_work);
3847
3848        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3849}
3850
3851static int rbd_request_lock(struct rbd_device *rbd_dev)
3852{
3853        struct page **reply_pages;
3854        size_t reply_len;
3855        bool lock_owner_responded = false;
3856        int ret;
3857
3858        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3859
3860        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3861                                   &reply_pages, &reply_len);
3862        if (ret && ret != -ETIMEDOUT) {
3863                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3864                goto out;
3865        }
3866
3867        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3868                void *p = page_address(reply_pages[0]);
3869                void *const end = p + reply_len;
3870                u32 n;
3871
3872                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3873                while (n--) {
3874                        u8 struct_v;
3875                        u32 len;
3876
3877                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3878                        p += 8 + 8; /* skip gid and cookie */
3879
3880                        ceph_decode_32_safe(&p, end, len, e_inval);
3881                        if (!len)
3882                                continue;
3883
3884                        if (lock_owner_responded) {
3885                                rbd_warn(rbd_dev,
3886                                         "duplicate lock owners detected");
3887                                ret = -EIO;
3888                                goto out;
3889                        }
3890
3891                        lock_owner_responded = true;
3892                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3893                                                  &struct_v, &len);
3894                        if (ret) {
3895                                rbd_warn(rbd_dev,
3896                                         "failed to decode ResponseMessage: %d",
3897                                         ret);
3898                                goto e_inval;
3899                        }
3900
3901                        ret = ceph_decode_32(&p);
3902                }
3903        }
3904
3905        if (!lock_owner_responded) {
3906                rbd_warn(rbd_dev, "no lock owners detected");
3907                ret = -ETIMEDOUT;
3908        }
3909
3910out:
3911        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3912        return ret;
3913
3914e_inval:
3915        ret = -EINVAL;
3916        goto out;
3917}
3918
3919/*
3920 * Either image request state machine(s) or rbd_add_acquire_lock()
3921 * (i.e. "rbd map").
3922 */
3923static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3924{
3925        struct rbd_img_request *img_req;
3926
3927        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3928        lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
3929
3930        cancel_delayed_work(&rbd_dev->lock_dwork);
3931        if (!completion_done(&rbd_dev->acquire_wait)) {
3932                rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3933                           list_empty(&rbd_dev->running_list));
3934                rbd_dev->acquire_err = result;
3935                complete_all(&rbd_dev->acquire_wait);
3936                return;
3937        }
3938
3939        list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3940                mutex_lock(&img_req->state_mutex);
3941                rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3942                rbd_img_schedule(img_req, result);
3943                mutex_unlock(&img_req->state_mutex);
3944        }
3945
3946        list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3947}
3948
3949static int get_lock_owner_info(struct rbd_device *rbd_dev,
3950                               struct ceph_locker **lockers, u32 *num_lockers)
3951{
3952        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3953        u8 lock_type;
3954        char *lock_tag;
3955        int ret;
3956
3957        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3958
3959        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3960                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3961                                 &lock_type, &lock_tag, lockers, num_lockers);
3962        if (ret)
3963                return ret;
3964
3965        if (*num_lockers == 0) {
3966                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3967                goto out;
3968        }
3969
3970        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3971                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3972                         lock_tag);
3973                ret = -EBUSY;
3974                goto out;
3975        }
3976
3977        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3978                rbd_warn(rbd_dev, "shared lock type detected");
3979                ret = -EBUSY;
3980                goto out;
3981        }
3982
3983        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3984                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3985                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3986                         (*lockers)[0].id.cookie);
3987                ret = -EBUSY;
3988                goto out;
3989        }
3990
3991out:
3992        kfree(lock_tag);
3993        return ret;
3994}
3995
3996static int find_watcher(struct rbd_device *rbd_dev,
3997                        const struct ceph_locker *locker)
3998{
3999        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4000        struct ceph_watch_item *watchers;
4001        u32 num_watchers;
4002        u64 cookie;
4003        int i;
4004        int ret;
4005
4006        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4007                                      &rbd_dev->header_oloc, &watchers,
4008                                      &num_watchers);
4009        if (ret)
4010                return ret;
4011
4012        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4013        for (i = 0; i < num_watchers; i++) {
4014                /*
4015                 * Ignore addr->type while comparing.  This mimics
4016                 * entity_addr_t::get_legacy_str() + strcmp().
4017                 */
4018                if (ceph_addr_equal_no_type(&watchers[i].addr,
4019                                            &locker->info.addr) &&
4020                    watchers[i].cookie == cookie) {
4021                        struct rbd_client_id cid = {
4022                                .gid = le64_to_cpu(watchers[i].name.num),
4023                                .handle = cookie,
4024                        };
4025
4026                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4027                             rbd_dev, cid.gid, cid.handle);
4028                        rbd_set_owner_cid(rbd_dev, &cid);
4029                        ret = 1;
4030                        goto out;
4031                }
4032        }
4033
4034        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4035        ret = 0;
4036out:
4037        kfree(watchers);
4038        return ret;
4039}
4040
4041/*
4042 * lock_rwsem must be held for write
4043 */
4044static int rbd_try_lock(struct rbd_device *rbd_dev)
4045{
4046        struct ceph_client *client = rbd_dev->rbd_client->client;
4047        struct ceph_locker *lockers;
4048        u32 num_lockers;
4049        int ret;
4050
4051        for (;;) {
4052                ret = rbd_lock(rbd_dev);
4053                if (ret != -EBUSY)
4054                        return ret;
4055
4056                /* determine if the current lock holder is still alive */
4057                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4058                if (ret)
4059                        return ret;
4060
4061                if (num_lockers == 0)
4062                        goto again;
4063
4064                ret = find_watcher(rbd_dev, lockers);
4065                if (ret)
4066                        goto out; /* request lock or error */
4067
4068                rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4069                         ENTITY_NAME(lockers[0].id.name));
4070
4071                ret = ceph_monc_blocklist_add(&client->monc,
4072                                              &lockers[0].info.addr);
4073                if (ret) {
4074                        rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
4075                                 ENTITY_NAME(lockers[0].id.name), ret);
4076                        goto out;
4077                }
4078
4079                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4080                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
4081                                          lockers[0].id.cookie,
4082                                          &lockers[0].id.name);
4083                if (ret && ret != -ENOENT)
4084                        goto out;
4085
4086again:
4087                ceph_free_lockers(lockers, num_lockers);
4088        }
4089
4090out:
4091        ceph_free_lockers(lockers, num_lockers);
4092        return ret;
4093}
4094
4095static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4096{
4097        int ret;
4098
4099        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4100                ret = rbd_object_map_open(rbd_dev);
4101                if (ret)
4102                        return ret;
4103        }
4104
4105        return 0;
4106}
4107
4108/*
4109 * Return:
4110 *   0 - lock acquired
4111 *   1 - caller should call rbd_request_lock()
4112 *  <0 - error
4113 */
4114static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4115{
4116        int ret;
4117
4118        down_read(&rbd_dev->lock_rwsem);
4119        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4120             rbd_dev->lock_state);
4121        if (__rbd_is_lock_owner(rbd_dev)) {
4122                up_read(&rbd_dev->lock_rwsem);
4123                return 0;
4124        }
4125
4126        up_read(&rbd_dev->lock_rwsem);
4127        down_write(&rbd_dev->lock_rwsem);
4128        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4129             rbd_dev->lock_state);
4130        if (__rbd_is_lock_owner(rbd_dev)) {
4131                up_write(&rbd_dev->lock_rwsem);
4132                return 0;
4133        }
4134
4135        ret = rbd_try_lock(rbd_dev);
4136        if (ret < 0) {
4137                rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4138                if (ret == -EBLOCKLISTED)
4139                        goto out;
4140
4141                ret = 1; /* request lock anyway */
4142        }
4143        if (ret > 0) {
4144                up_write(&rbd_dev->lock_rwsem);
4145                return ret;
4146        }
4147
4148        rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4149        rbd_assert(list_empty(&rbd_dev->running_list));
4150
4151        ret = rbd_post_acquire_action(rbd_dev);
4152        if (ret) {
4153                rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4154                /*
4155                 * Can't stay in RBD_LOCK_STATE_LOCKED because
4156                 * rbd_lock_add_request() would let the request through,
4157                 * assuming that e.g. object map is locked and loaded.
4158                 */
4159                rbd_unlock(rbd_dev);
4160        }
4161
4162out:
4163        wake_lock_waiters(rbd_dev, ret);
4164        up_write(&rbd_dev->lock_rwsem);
4165        return ret;
4166}
4167
4168static void rbd_acquire_lock(struct work_struct *work)
4169{
4170        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4171                                            struct rbd_device, lock_dwork);
4172        int ret;
4173
4174        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4175again:
4176        ret = rbd_try_acquire_lock(rbd_dev);
4177        if (ret <= 0) {
4178                dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4179                return;
4180        }
4181
4182        ret = rbd_request_lock(rbd_dev);
4183        if (ret == -ETIMEDOUT) {
4184                goto again; /* treat this as a dead client */
4185        } else if (ret == -EROFS) {
4186                rbd_warn(rbd_dev, "peer will not release lock");
4187                down_write(&rbd_dev->lock_rwsem);
4188                wake_lock_waiters(rbd_dev, ret);
4189                up_write(&rbd_dev->lock_rwsem);
4190        } else if (ret < 0) {
4191                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4192                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4193                                 RBD_RETRY_DELAY);
4194        } else {
4195                /*
4196                 * lock owner acked, but resend if we don't see them
4197                 * release the lock
4198                 */
4199                dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4200                     rbd_dev);
4201                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4202                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4203        }
4204}
4205
4206static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4207{
4208        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4209        lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
4210
4211        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4212                return false;
4213
4214        /*
4215         * Ensure that all in-flight IO is flushed.
4216         */
4217        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4218        rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4219        if (list_empty(&rbd_dev->running_list))
4220                return true;
4221
4222        up_write(&rbd_dev->lock_rwsem);
4223        wait_for_completion(&rbd_dev->releasing_wait);
4224
4225        down_write(&rbd_dev->lock_rwsem);
4226        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4227                return false;
4228
4229        rbd_assert(list_empty(&rbd_dev->running_list));
4230        return true;
4231}
4232
4233static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4234{
4235        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4236                rbd_object_map_close(rbd_dev);
4237}
4238
4239static void __rbd_release_lock(struct rbd_device *rbd_dev)
4240{
4241        rbd_assert(list_empty(&rbd_dev->running_list));
4242
4243        rbd_pre_release_action(rbd_dev);
4244        rbd_unlock(rbd_dev);
4245}
4246
4247/*
4248 * lock_rwsem must be held for write
4249 */
4250static void rbd_release_lock(struct rbd_device *rbd_dev)
4251{
4252        if (!rbd_quiesce_lock(rbd_dev))
4253                return;
4254
4255        __rbd_release_lock(rbd_dev);
4256
4257        /*
4258         * Give others a chance to grab the lock - we would re-acquire
4259         * almost immediately if we got new IO while draining the running
4260         * list otherwise.  We need to ack our own notifications, so this
4261         * lock_dwork will be requeued from rbd_handle_released_lock() by
4262         * way of maybe_kick_acquire().
4263         */
4264        cancel_delayed_work(&rbd_dev->lock_dwork);
4265}
4266
4267static void rbd_release_lock_work(struct work_struct *work)
4268{
4269        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4270                                                  unlock_work);
4271
4272        down_write(&rbd_dev->lock_rwsem);
4273        rbd_release_lock(rbd_dev);
4274        up_write(&rbd_dev->lock_rwsem);
4275}
4276
4277static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4278{
4279        bool have_requests;
4280
4281        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4282        if (__rbd_is_lock_owner(rbd_dev))
4283                return;
4284
4285        spin_lock(&rbd_dev->lock_lists_lock);
4286        have_requests = !list_empty(&rbd_dev->acquiring_list);
4287        spin_unlock(&rbd_dev->lock_lists_lock);
4288        if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4289                dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4290                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4291        }
4292}
4293
4294static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4295                                     void **p)
4296{
4297        struct rbd_client_id cid = { 0 };
4298
4299        if (struct_v >= 2) {
4300                cid.gid = ceph_decode_64(p);
4301                cid.handle = ceph_decode_64(p);
4302        }
4303
4304        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4305             cid.handle);
4306        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4307                down_write(&rbd_dev->lock_rwsem);
4308                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4309                        dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4310                             __func__, rbd_dev, cid.gid, cid.handle);
4311                } else {
4312                        rbd_set_owner_cid(rbd_dev, &cid);
4313                }
4314                downgrade_write(&rbd_dev->lock_rwsem);
4315        } else {
4316                down_read(&rbd_dev->lock_rwsem);
4317        }
4318
4319        maybe_kick_acquire(rbd_dev);
4320        up_read(&rbd_dev->lock_rwsem);
4321}
4322
4323static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4324                                     void **p)
4325{
4326        struct rbd_client_id cid = { 0 };
4327
4328        if (struct_v >= 2) {
4329                cid.gid = ceph_decode_64(p);
4330                cid.handle = ceph_decode_64(p);
4331        }
4332
4333        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4334             cid.handle);
4335        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4336                down_write(&rbd_dev->lock_rwsem);
4337                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4338                        dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4339                             __func__, rbd_dev, cid.gid, cid.handle,
4340                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4341                } else {
4342                        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4343                }
4344                downgrade_write(&rbd_dev->lock_rwsem);
4345        } else {
4346                down_read(&rbd_dev->lock_rwsem);
4347        }
4348
4349        maybe_kick_acquire(rbd_dev);
4350        up_read(&rbd_dev->lock_rwsem);
4351}
4352
4353/*
4354 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4355 * ResponseMessage is needed.
4356 */
4357static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4358                                   void **p)
4359{
4360        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4361        struct rbd_client_id cid = { 0 };
4362        int result = 1;
4363
4364        if (struct_v >= 2) {
4365                cid.gid = ceph_decode_64(p);
4366                cid.handle = ceph_decode_64(p);
4367        }
4368
4369        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4370             cid.handle);
4371        if (rbd_cid_equal(&cid, &my_cid))
4372                return result;
4373
4374        down_read(&rbd_dev->lock_rwsem);
4375        if (__rbd_is_lock_owner(rbd_dev)) {
4376                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4377                    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4378                        goto out_unlock;
4379
4380                /*
4381                 * encode ResponseMessage(0) so the peer can detect
4382                 * a missing owner
4383                 */
4384                result = 0;
4385
4386                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4387                        if (!rbd_dev->opts->exclusive) {
4388                                dout("%s rbd_dev %p queueing unlock_work\n",
4389                                     __func__, rbd_dev);
4390                                queue_work(rbd_dev->task_wq,
4391                                           &rbd_dev->unlock_work);
4392                        } else {
4393                                /* refuse to release the lock */
4394                                result = -EROFS;
4395                        }
4396                }
4397        }
4398
4399out_unlock:
4400        up_read(&rbd_dev->lock_rwsem);
4401        return result;
4402}
4403
4404static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4405                                     u64 notify_id, u64 cookie, s32 *result)
4406{
4407        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4408        char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4409        int buf_size = sizeof(buf);
4410        int ret;
4411
4412        if (result) {
4413                void *p = buf;
4414
4415                /* encode ResponseMessage */
4416                ceph_start_encoding(&p, 1, 1,
4417                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
4418                ceph_encode_32(&p, *result);
4419        } else {
4420                buf_size = 0;
4421        }
4422
4423        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4424                                   &rbd_dev->header_oloc, notify_id, cookie,
4425                                   buf, buf_size);
4426        if (ret)
4427                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4428}
4429
4430static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4431                                   u64 cookie)
4432{
4433        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4434        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4435}
4436
4437static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4438                                          u64 notify_id, u64 cookie, s32 result)
4439{
4440        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4441        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4442}
4443
4444static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4445                         u64 notifier_id, void *data, size_t data_len)
4446{
4447        struct rbd_device *rbd_dev = arg;
4448        void *p = data;
4449        void *const end = p + data_len;
4450        u8 struct_v = 0;
4451        u32 len;
4452        u32 notify_op;
4453        int ret;
4454
4455        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4456             __func__, rbd_dev, cookie, notify_id, data_len);
4457        if (data_len) {
4458                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4459                                          &struct_v, &len);
4460                if (ret) {
4461                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4462                                 ret);
4463                        return;
4464                }
4465
4466                notify_op = ceph_decode_32(&p);
4467        } else {
4468                /* legacy notification for header updates */
4469                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4470                len = 0;
4471        }
4472
4473        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4474        switch (notify_op) {
4475        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4476                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4477                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4478                break;
4479        case RBD_NOTIFY_OP_RELEASED_LOCK:
4480                rbd_handle_released_lock(rbd_dev, struct_v, &p);
4481                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4482                break;
4483        case RBD_NOTIFY_OP_REQUEST_LOCK:
4484                ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4485                if (ret <= 0)
4486                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4487                                                      cookie, ret);
4488                else
4489                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4490                break;
4491        case RBD_NOTIFY_OP_HEADER_UPDATE:
4492                ret = rbd_dev_refresh(rbd_dev);
4493                if (ret)
4494                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
4495
4496                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4497                break;
4498        default:
4499                if (rbd_is_lock_owner(rbd_dev))
4500                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4501                                                      cookie, -EOPNOTSUPP);
4502                else
4503                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4504                break;
4505        }
4506}
4507
4508static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4509
4510static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4511{
4512        struct rbd_device *rbd_dev = arg;
4513
4514        rbd_warn(rbd_dev, "encountered watch error: %d", err);
4515
4516        down_write(&rbd_dev->lock_rwsem);
4517        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4518        up_write(&rbd_dev->lock_rwsem);
4519
4520        mutex_lock(&rbd_dev->watch_mutex);
4521        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4522                __rbd_unregister_watch(rbd_dev);
4523                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4524
4525                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4526        }
4527        mutex_unlock(&rbd_dev->watch_mutex);
4528}
4529
4530/*
4531 * watch_mutex must be locked
4532 */
4533static int __rbd_register_watch(struct rbd_device *rbd_dev)
4534{
4535        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4536        struct ceph_osd_linger_request *handle;
4537
4538        rbd_assert(!rbd_dev->watch_handle);
4539        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4540
4541        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4542                                 &rbd_dev->header_oloc, rbd_watch_cb,
4543                                 rbd_watch_errcb, rbd_dev);
4544        if (IS_ERR(handle))
4545                return PTR_ERR(handle);
4546
4547        rbd_dev->watch_handle = handle;
4548        return 0;
4549}
4550
4551/*
4552 * watch_mutex must be locked
4553 */
4554static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4555{
4556        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4557        int ret;
4558
4559        rbd_assert(rbd_dev->watch_handle);
4560        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4561
4562        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4563        if (ret)
4564                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4565
4566        rbd_dev->watch_handle = NULL;
4567}
4568
4569static int rbd_register_watch(struct rbd_device *rbd_dev)
4570{
4571        int ret;
4572
4573        mutex_lock(&rbd_dev->watch_mutex);
4574        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4575        ret = __rbd_register_watch(rbd_dev);
4576        if (ret)
4577                goto out;
4578
4579        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4580        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4581
4582out:
4583        mutex_unlock(&rbd_dev->watch_mutex);
4584        return ret;
4585}
4586
4587static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4588{
4589        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4590
4591        cancel_work_sync(&rbd_dev->acquired_lock_work);
4592        cancel_work_sync(&rbd_dev->released_lock_work);
4593        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4594        cancel_work_sync(&rbd_dev->unlock_work);
4595}
4596
4597/*
4598 * header_rwsem must not be held to avoid a deadlock with
4599 * rbd_dev_refresh() when flushing notifies.
4600 */
4601static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4602{
4603        cancel_tasks_sync(rbd_dev);
4604
4605        mutex_lock(&rbd_dev->watch_mutex);
4606        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4607                __rbd_unregister_watch(rbd_dev);
4608        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4609        mutex_unlock(&rbd_dev->watch_mutex);
4610
4611        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4612        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4613}
4614
4615/*
4616 * lock_rwsem must be held for write
4617 */
4618static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4619{
4620        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4621        char cookie[32];
4622        int ret;
4623
4624        if (!rbd_quiesce_lock(rbd_dev))
4625                return;
4626
4627        format_lock_cookie(rbd_dev, cookie);
4628        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4629                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4630                                  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4631                                  RBD_LOCK_TAG, cookie);
4632        if (ret) {
4633                if (ret != -EOPNOTSUPP)
4634                        rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4635                                 ret);
4636
4637                /*
4638                 * Lock cookie cannot be updated on older OSDs, so do
4639                 * a manual release and queue an acquire.
4640                 */
4641                __rbd_release_lock(rbd_dev);
4642                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4643        } else {
4644                __rbd_lock(rbd_dev, cookie);
4645                wake_lock_waiters(rbd_dev, 0);
4646        }
4647}
4648
4649static void rbd_reregister_watch(struct work_struct *work)
4650{
4651        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4652                                            struct rbd_device, watch_dwork);
4653        int ret;
4654
4655        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4656
4657        mutex_lock(&rbd_dev->watch_mutex);
4658        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4659                mutex_unlock(&rbd_dev->watch_mutex);
4660                return;
4661        }
4662
4663        ret = __rbd_register_watch(rbd_dev);
4664        if (ret) {
4665                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4666                if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4667                        queue_delayed_work(rbd_dev->task_wq,
4668                                           &rbd_dev->watch_dwork,
4669                                           RBD_RETRY_DELAY);
4670                        mutex_unlock(&rbd_dev->watch_mutex);
4671                        return;
4672                }
4673
4674                mutex_unlock(&rbd_dev->watch_mutex);
4675                down_write(&rbd_dev->lock_rwsem);
4676                wake_lock_waiters(rbd_dev, ret);
4677                up_write(&rbd_dev->lock_rwsem);
4678                return;
4679        }
4680
4681        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4682        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4683        mutex_unlock(&rbd_dev->watch_mutex);
4684
4685        down_write(&rbd_dev->lock_rwsem);
4686        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4687                rbd_reacquire_lock(rbd_dev);
4688        up_write(&rbd_dev->lock_rwsem);
4689
4690        ret = rbd_dev_refresh(rbd_dev);
4691        if (ret)
4692                rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4693}
4694
4695/*
4696 * Synchronous osd object method call.  Returns the number of bytes
4697 * returned in the outbound buffer, or a negative error code.
4698 */
4699static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4700                             struct ceph_object_id *oid,
4701                             struct ceph_object_locator *oloc,
4702                             const char *method_name,
4703                             const void *outbound,
4704                             size_t outbound_size,
4705                             void *inbound,
4706                             size_t inbound_size)
4707{
4708        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4709        struct page *req_page = NULL;
4710        struct page *reply_page;
4711        int ret;
4712
4713        /*
4714         * Method calls are ultimately read operations.  The result
4715         * should placed into the inbound buffer provided.  They
4716         * also supply outbound data--parameters for the object
4717         * method.  Currently if this is present it will be a
4718         * snapshot id.
4719         */
4720        if (outbound) {
4721                if (outbound_size > PAGE_SIZE)
4722                        return -E2BIG;
4723
4724                req_page = alloc_page(GFP_KERNEL);
4725                if (!req_page)
4726                        return -ENOMEM;
4727
4728                memcpy(page_address(req_page), outbound, outbound_size);
4729        }
4730
4731        reply_page = alloc_page(GFP_KERNEL);
4732        if (!reply_page) {
4733                if (req_page)
4734                        __free_page(req_page);
4735                return -ENOMEM;
4736        }
4737
4738        ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4739                             CEPH_OSD_FLAG_READ, req_page, outbound_size,
4740                             &reply_page, &inbound_size);
4741        if (!ret) {
4742                memcpy(inbound, page_address(reply_page), inbound_size);
4743                ret = inbound_size;
4744        }
4745
4746        if (req_page)
4747                __free_page(req_page);
4748        __free_page(reply_page);
4749        return ret;
4750}
4751
4752static void rbd_queue_workfn(struct work_struct *work)
4753{
4754        struct rbd_img_request *img_request =
4755            container_of(work, struct rbd_img_request, work);
4756        struct rbd_device *rbd_dev = img_request->rbd_dev;
4757        enum obj_operation_type op_type = img_request->op_type;
4758        struct request *rq = blk_mq_rq_from_pdu(img_request);
4759        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4760        u64 length = blk_rq_bytes(rq);
4761        u64 mapping_size;
4762        int result;
4763
4764        /* Ignore/skip any zero-length requests */
4765        if (!length) {
4766                dout("%s: zero-length request\n", __func__);
4767                result = 0;
4768                goto err_img_request;
4769        }
4770
4771        blk_mq_start_request(rq);
4772
4773        down_read(&rbd_dev->header_rwsem);
4774        mapping_size = rbd_dev->mapping.size;
4775        rbd_img_capture_header(img_request);
4776        up_read(&rbd_dev->header_rwsem);
4777
4778        if (offset + length > mapping_size) {
4779                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4780                         length, mapping_size);
4781                result = -EIO;
4782                goto err_img_request;
4783        }
4784
4785        dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4786             img_request, obj_op_name(op_type), offset, length);
4787
4788        if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4789                result = rbd_img_fill_nodata(img_request, offset, length);
4790        else
4791                result = rbd_img_fill_from_bio(img_request, offset, length,
4792                                               rq->bio);
4793        if (result)
4794                goto err_img_request;
4795
4796        rbd_img_handle_request(img_request, 0);
4797        return;
4798
4799err_img_request:
4800        rbd_img_request_destroy(img_request);
4801        if (result)
4802                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4803                         obj_op_name(op_type), length, offset, result);
4804        blk_mq_end_request(rq, errno_to_blk_status(result));
4805}
4806
4807static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4808                const struct blk_mq_queue_data *bd)
4809{
4810        struct rbd_device *rbd_dev = hctx->queue->queuedata;
4811        struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4812        enum obj_operation_type op_type;
4813
4814        switch (req_op(bd->rq)) {
4815        case REQ_OP_DISCARD:
4816                op_type = OBJ_OP_DISCARD;
4817                break;
4818        case REQ_OP_WRITE_ZEROES:
4819                op_type = OBJ_OP_ZEROOUT;
4820                break;
4821        case REQ_OP_WRITE:
4822                op_type = OBJ_OP_WRITE;
4823                break;
4824        case REQ_OP_READ:
4825                op_type = OBJ_OP_READ;
4826                break;
4827        default:
4828                rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4829                return BLK_STS_IOERR;
4830        }
4831
4832        rbd_img_request_init(img_req, rbd_dev, op_type);
4833
4834        if (rbd_img_is_write(img_req)) {
4835                if (rbd_is_ro(rbd_dev)) {
4836                        rbd_warn(rbd_dev, "%s on read-only mapping",
4837                                 obj_op_name(img_req->op_type));
4838                        return BLK_STS_IOERR;
4839                }
4840                rbd_assert(!rbd_is_snap(rbd_dev));
4841        }
4842
4843        INIT_WORK(&img_req->work, rbd_queue_workfn);
4844        queue_work(rbd_wq, &img_req->work);
4845        return BLK_STS_OK;
4846}
4847
4848static void rbd_free_disk(struct rbd_device *rbd_dev)
4849{
4850        blk_cleanup_queue(rbd_dev->disk->queue);
4851        blk_mq_free_tag_set(&rbd_dev->tag_set);
4852        put_disk(rbd_dev->disk);
4853        rbd_dev->disk = NULL;
4854}
4855
4856static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4857                             struct ceph_object_id *oid,
4858                             struct ceph_object_locator *oloc,
4859                             void *buf, int buf_len)
4860
4861{
4862        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4863        struct ceph_osd_request *req;
4864        struct page **pages;
4865        int num_pages = calc_pages_for(0, buf_len);
4866        int ret;
4867
4868        req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4869        if (!req)
4870                return -ENOMEM;
4871
4872        ceph_oid_copy(&req->r_base_oid, oid);
4873        ceph_oloc_copy(&req->r_base_oloc, oloc);
4874        req->r_flags = CEPH_OSD_FLAG_READ;
4875
4876        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4877        if (IS_ERR(pages)) {
4878                ret = PTR_ERR(pages);
4879                goto out_req;
4880        }
4881
4882        osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4883        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4884                                         true);
4885
4886        ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4887        if (ret)
4888                goto out_req;
4889
4890        ceph_osdc_start_request(osdc, req, false);
4891        ret = ceph_osdc_wait_request(osdc, req);
4892        if (ret >= 0)
4893                ceph_copy_from_page_vector(pages, buf, 0, ret);
4894
4895out_req:
4896        ceph_osdc_put_request(req);
4897        return ret;
4898}
4899
4900/*
4901 * Read the complete header for the given rbd device.  On successful
4902 * return, the rbd_dev->header field will contain up-to-date
4903 * information about the image.
4904 */
4905static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4906{
4907        struct rbd_image_header_ondisk *ondisk = NULL;
4908        u32 snap_count = 0;
4909        u64 names_size = 0;
4910        u32 want_count;
4911        int ret;
4912
4913        /*
4914         * The complete header will include an array of its 64-bit
4915         * snapshot ids, followed by the names of those snapshots as
4916         * a contiguous block of NUL-terminated strings.  Note that
4917         * the number of snapshots could change by the time we read
4918         * it in, in which case we re-read it.
4919         */
4920        do {
4921                size_t size;
4922
4923                kfree(ondisk);
4924
4925                size = sizeof (*ondisk);
4926                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4927                size += names_size;
4928                ondisk = kmalloc(size, GFP_KERNEL);
4929                if (!ondisk)
4930                        return -ENOMEM;
4931
4932                ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4933                                        &rbd_dev->header_oloc, ondisk, size);
4934                if (ret < 0)
4935                        goto out;
4936                if ((size_t)ret < size) {
4937                        ret = -ENXIO;
4938                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4939                                size, ret);
4940                        goto out;
4941                }
4942                if (!rbd_dev_ondisk_valid(ondisk)) {
4943                        ret = -ENXIO;
4944                        rbd_warn(rbd_dev, "invalid header");
4945                        goto out;
4946                }
4947
4948                names_size = le64_to_cpu(ondisk->snap_names_len);
4949                want_count = snap_count;
4950                snap_count = le32_to_cpu(ondisk->snap_count);
4951        } while (snap_count != want_count);
4952
4953        ret = rbd_header_from_disk(rbd_dev, ondisk);
4954out:
4955        kfree(ondisk);
4956
4957        return ret;
4958}
4959
4960static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4961{
4962        sector_t size;
4963
4964        /*
4965         * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4966         * try to update its size.  If REMOVING is set, updating size
4967         * is just useless work since the device can't be opened.
4968         */
4969        if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4970            !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4971                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4972                dout("setting size to %llu sectors", (unsigned long long)size);
4973                set_capacity(rbd_dev->disk, size);
4974                revalidate_disk_size(rbd_dev->disk, true);
4975        }
4976}
4977
4978static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4979{
4980        u64 mapping_size;
4981        int ret;
4982
4983        down_write(&rbd_dev->header_rwsem);
4984        mapping_size = rbd_dev->mapping.size;
4985
4986        ret = rbd_dev_header_info(rbd_dev);
4987        if (ret)
4988                goto out;
4989
4990        /*
4991         * If there is a parent, see if it has disappeared due to the
4992         * mapped image getting flattened.
4993         */
4994        if (rbd_dev->parent) {
4995                ret = rbd_dev_v2_parent_info(rbd_dev);
4996                if (ret)
4997                        goto out;
4998        }
4999
5000        rbd_assert(!rbd_is_snap(rbd_dev));
5001        rbd_dev->mapping.size = rbd_dev->header.image_size;
5002
5003out:
5004        up_write(&rbd_dev->header_rwsem);
5005        if (!ret && mapping_size != rbd_dev->mapping.size)
5006                rbd_dev_update_size(rbd_dev);
5007
5008        return ret;
5009}
5010
5011static const struct blk_mq_ops rbd_mq_ops = {
5012        .queue_rq       = rbd_queue_rq,
5013};
5014
5015static int rbd_init_disk(struct rbd_device *rbd_dev)
5016{
5017        struct gendisk *disk;
5018        struct request_queue *q;
5019        unsigned int objset_bytes =
5020            rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5021        int err;
5022
5023        /* create gendisk info */
5024        disk = alloc_disk(single_major ?
5025                          (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5026                          RBD_MINORS_PER_MAJOR);
5027        if (!disk)
5028                return -ENOMEM;
5029
5030        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5031                 rbd_dev->dev_id);
5032        disk->major = rbd_dev->major;
5033        disk->first_minor = rbd_dev->minor;
5034        if (single_major)
5035                disk->flags |= GENHD_FL_EXT_DEVT;
5036        disk->fops = &rbd_bd_ops;
5037        disk->private_data = rbd_dev;
5038
5039        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5040        rbd_dev->tag_set.ops = &rbd_mq_ops;
5041        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5042        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5043        rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5044        rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
5045        rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
5046
5047        err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5048        if (err)
5049                goto out_disk;
5050
5051        q = blk_mq_init_queue(&rbd_dev->tag_set);
5052        if (IS_ERR(q)) {
5053                err = PTR_ERR(q);
5054                goto out_tag_set;
5055        }
5056
5057        blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5058        /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5059
5060        blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5061        q->limits.max_sectors = queue_max_hw_sectors(q);
5062        blk_queue_max_segments(q, USHRT_MAX);
5063        blk_queue_max_segment_size(q, UINT_MAX);
5064        blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5065        blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5066
5067        if (rbd_dev->opts->trim) {
5068                blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5069                q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5070                blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5071                blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5072        }
5073
5074        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5075                blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
5076
5077        /*
5078         * disk_release() expects a queue ref from add_disk() and will
5079         * put it.  Hold an extra ref until add_disk() is called.
5080         */
5081        WARN_ON(!blk_get_queue(q));
5082        disk->queue = q;
5083        q->queuedata = rbd_dev;
5084
5085        rbd_dev->disk = disk;
5086
5087        return 0;
5088out_tag_set:
5089        blk_mq_free_tag_set(&rbd_dev->tag_set);
5090out_disk:
5091        put_disk(disk);
5092        return err;
5093}
5094
5095/*
5096  sysfs
5097*/
5098
5099static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5100{
5101        return container_of(dev, struct rbd_device, dev);
5102}
5103
5104static ssize_t rbd_size_show(struct device *dev,
5105                             struct device_attribute *attr, char *buf)
5106{
5107        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5108
5109        return sprintf(buf, "%llu\n",
5110                (unsigned long long)rbd_dev->mapping.size);
5111}
5112
5113static ssize_t rbd_features_show(struct device *dev,
5114                             struct device_attribute *attr, char *buf)
5115{
5116        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5117
5118        return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5119}
5120
5121static ssize_t rbd_major_show(struct device *dev,
5122                              struct device_attribute *attr, char *buf)
5123{
5124        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5125
5126        if (rbd_dev->major)
5127                return sprintf(buf, "%d\n", rbd_dev->major);
5128
5129        return sprintf(buf, "(none)\n");
5130}
5131
5132static ssize_t rbd_minor_show(struct device *dev,
5133                              struct device_attribute *attr, char *buf)
5134{
5135        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5136
5137        return sprintf(buf, "%d\n", rbd_dev->minor);
5138}
5139
5140static ssize_t rbd_client_addr_show(struct device *dev,
5141                                    struct device_attribute *attr, char *buf)
5142{
5143        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5144        struct ceph_entity_addr *client_addr =
5145            ceph_client_addr(rbd_dev->rbd_client->client);
5146
5147        return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5148                       le32_to_cpu(client_addr->nonce));
5149}
5150
5151static ssize_t rbd_client_id_show(struct device *dev,
5152                                  struct device_attribute *attr, char *buf)
5153{
5154        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5155
5156        return sprintf(buf, "client%lld\n",
5157                       ceph_client_gid(rbd_dev->rbd_client->client));
5158}
5159
5160static ssize_t rbd_cluster_fsid_show(struct device *dev,
5161                                     struct device_attribute *attr, char *buf)
5162{
5163        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5164
5165        return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5166}
5167
5168static ssize_t rbd_config_info_show(struct device *dev,
5169                                    struct device_attribute *attr, char *buf)
5170{
5171        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5172
5173        if (!capable(CAP_SYS_ADMIN))
5174                return -EPERM;
5175
5176        return sprintf(buf, "%s\n", rbd_dev->config_info);
5177}
5178
5179static ssize_t rbd_pool_show(struct device *dev,
5180                             struct device_attribute *attr, char *buf)
5181{
5182        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5183
5184        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5185}
5186
5187static ssize_t rbd_pool_id_show(struct device *dev,
5188                             struct device_attribute *attr, char *buf)
5189{
5190        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5191
5192        return sprintf(buf, "%llu\n",
5193                        (unsigned long long) rbd_dev->spec->pool_id);
5194}
5195
5196static ssize_t rbd_pool_ns_show(struct device *dev,
5197                                struct device_attribute *attr, char *buf)
5198{
5199        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5200
5201        return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5202}
5203
5204static ssize_t rbd_name_show(struct device *dev,
5205                             struct device_attribute *attr, char *buf)
5206{
5207        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5208
5209        if (rbd_dev->spec->image_name)
5210                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5211
5212        return sprintf(buf, "(unknown)\n");
5213}
5214
5215static ssize_t rbd_image_id_show(struct device *dev,
5216                             struct device_attribute *attr, char *buf)
5217{
5218        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5219
5220        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5221}
5222
5223/*
5224 * Shows the name of the currently-mapped snapshot (or
5225 * RBD_SNAP_HEAD_NAME for the base image).
5226 */
5227static ssize_t rbd_snap_show(struct device *dev,
5228                             struct device_attribute *attr,
5229                             char *buf)
5230{
5231        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5232
5233        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5234}
5235
5236static ssize_t rbd_snap_id_show(struct device *dev,
5237                                struct device_attribute *attr, char *buf)
5238{
5239        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5240
5241        return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5242}
5243
5244/*
5245 * For a v2 image, shows the chain of parent images, separated by empty
5246 * lines.  For v1 images or if there is no parent, shows "(no parent
5247 * image)".
5248 */
5249static ssize_t rbd_parent_show(struct device *dev,
5250                               struct device_attribute *attr,
5251                               char *buf)
5252{
5253        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5254        ssize_t count = 0;
5255
5256        if (!rbd_dev->parent)
5257                return sprintf(buf, "(no parent image)\n");
5258
5259        for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5260                struct rbd_spec *spec = rbd_dev->parent_spec;
5261
5262                count += sprintf(&buf[count], "%s"
5263                            "pool_id %llu\npool_name %s\n"
5264                            "pool_ns %s\n"
5265                            "image_id %s\nimage_name %s\n"
5266                            "snap_id %llu\nsnap_name %s\n"
5267                            "overlap %llu\n",
5268                            !count ? "" : "\n", /* first? */
5269                            spec->pool_id, spec->pool_name,
5270                            spec->pool_ns ?: "",
5271                            spec->image_id, spec->image_name ?: "(unknown)",
5272                            spec->snap_id, spec->snap_name,
5273                            rbd_dev->parent_overlap);
5274        }
5275
5276        return count;
5277}
5278
5279static ssize_t rbd_image_refresh(struct device *dev,
5280                                 struct device_attribute *attr,
5281                                 const char *buf,
5282                                 size_t size)
5283{
5284        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5285        int ret;
5286
5287        if (!capable(CAP_SYS_ADMIN))
5288                return -EPERM;
5289
5290        ret = rbd_dev_refresh(rbd_dev);
5291        if (ret)
5292                return ret;
5293
5294        return size;
5295}
5296
5297static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5298static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5299static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5300static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5301static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5302static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5303static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5304static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5305static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5306static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5307static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5308static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5309static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5310static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5311static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5312static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5313static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5314
5315static struct attribute *rbd_attrs[] = {
5316        &dev_attr_size.attr,
5317        &dev_attr_features.attr,
5318        &dev_attr_major.attr,
5319        &dev_attr_minor.attr,
5320        &dev_attr_client_addr.attr,
5321        &dev_attr_client_id.attr,
5322        &dev_attr_cluster_fsid.attr,
5323        &dev_attr_config_info.attr,
5324        &dev_attr_pool.attr,
5325        &dev_attr_pool_id.attr,
5326        &dev_attr_pool_ns.attr,
5327        &dev_attr_name.attr,
5328        &dev_attr_image_id.attr,
5329        &dev_attr_current_snap.attr,
5330        &dev_attr_snap_id.attr,
5331        &dev_attr_parent.attr,
5332        &dev_attr_refresh.attr,
5333        NULL
5334};
5335
5336static struct attribute_group rbd_attr_group = {
5337        .attrs = rbd_attrs,
5338};
5339
5340static const struct attribute_group *rbd_attr_groups[] = {
5341        &rbd_attr_group,
5342        NULL
5343};
5344
5345static void rbd_dev_release(struct device *dev);
5346
5347static const struct device_type rbd_device_type = {
5348        .name           = "rbd",
5349        .groups         = rbd_attr_groups,
5350        .release        = rbd_dev_release,
5351};
5352
5353static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5354{
5355        kref_get(&spec->kref);
5356
5357        return spec;
5358}
5359
5360static void rbd_spec_free(struct kref *kref);
5361static void rbd_spec_put(struct rbd_spec *spec)
5362{
5363        if (spec)
5364                kref_put(&spec->kref, rbd_spec_free);
5365}
5366
5367static struct rbd_spec *rbd_spec_alloc(void)
5368{
5369        struct rbd_spec *spec;
5370
5371        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5372        if (!spec)
5373                return NULL;
5374
5375        spec->pool_id = CEPH_NOPOOL;
5376        spec->snap_id = CEPH_NOSNAP;
5377        kref_init(&spec->kref);
5378
5379        return spec;
5380}
5381
5382static void rbd_spec_free(struct kref *kref)
5383{
5384        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5385
5386        kfree(spec->pool_name);
5387        kfree(spec->pool_ns);
5388        kfree(spec->image_id);
5389        kfree(spec->image_name);
5390        kfree(spec->snap_name);
5391        kfree(spec);
5392}
5393
5394static void rbd_dev_free(struct rbd_device *rbd_dev)
5395{
5396        WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5397        WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5398
5399        ceph_oid_destroy(&rbd_dev->header_oid);
5400        ceph_oloc_destroy(&rbd_dev->header_oloc);
5401        kfree(rbd_dev->config_info);
5402
5403        rbd_put_client(rbd_dev->rbd_client);
5404        rbd_spec_put(rbd_dev->spec);
5405        kfree(rbd_dev->opts);
5406        kfree(rbd_dev);
5407}
5408
5409static void rbd_dev_release(struct device *dev)
5410{
5411        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5412        bool need_put = !!rbd_dev->opts;
5413
5414        if (need_put) {
5415                destroy_workqueue(rbd_dev->task_wq);
5416                ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5417        }
5418
5419        rbd_dev_free(rbd_dev);
5420
5421        /*
5422         * This is racy, but way better than putting module outside of
5423         * the release callback.  The race window is pretty small, so
5424         * doing something similar to dm (dm-builtin.c) is overkill.
5425         */
5426        if (need_put)
5427                module_put(THIS_MODULE);
5428}
5429
5430static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5431                                           struct rbd_spec *spec)
5432{
5433        struct rbd_device *rbd_dev;
5434
5435        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5436        if (!rbd_dev)
5437                return NULL;
5438
5439        spin_lock_init(&rbd_dev->lock);
5440        INIT_LIST_HEAD(&rbd_dev->node);
5441        init_rwsem(&rbd_dev->header_rwsem);
5442
5443        rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5444        ceph_oid_init(&rbd_dev->header_oid);
5445        rbd_dev->header_oloc.pool = spec->pool_id;
5446        if (spec->pool_ns) {
5447                WARN_ON(!*spec->pool_ns);
5448                rbd_dev->header_oloc.pool_ns =
5449                    ceph_find_or_create_string(spec->pool_ns,
5450                                               strlen(spec->pool_ns));
5451        }
5452
5453        mutex_init(&rbd_dev->watch_mutex);
5454        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5455        INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5456
5457        init_rwsem(&rbd_dev->lock_rwsem);
5458        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5459        INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5460        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5461        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5462        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5463        spin_lock_init(&rbd_dev->lock_lists_lock);
5464        INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5465        INIT_LIST_HEAD(&rbd_dev->running_list);
5466        init_completion(&rbd_dev->acquire_wait);
5467        init_completion(&rbd_dev->releasing_wait);
5468
5469        spin_lock_init(&rbd_dev->object_map_lock);
5470
5471        rbd_dev->dev.bus = &rbd_bus_type;
5472        rbd_dev->dev.type = &rbd_device_type;
5473        rbd_dev->dev.parent = &rbd_root_dev;
5474        device_initialize(&rbd_dev->dev);
5475
5476        rbd_dev->rbd_client = rbdc;
5477        rbd_dev->spec = spec;
5478
5479        return rbd_dev;
5480}
5481
5482/*
5483 * Create a mapping rbd_dev.
5484 */
5485static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5486                                         struct rbd_spec *spec,
5487                                         struct rbd_options *opts)
5488{
5489        struct rbd_device *rbd_dev;
5490
5491        rbd_dev = __rbd_dev_create(rbdc, spec);
5492        if (!rbd_dev)
5493                return NULL;
5494
5495        rbd_dev->opts = opts;
5496
5497        /* get an id and fill in device name */
5498        rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5499                                         minor_to_rbd_dev_id(1 << MINORBITS),
5500                                         GFP_KERNEL);
5501        if (rbd_dev->dev_id < 0)
5502                goto fail_rbd_dev;
5503
5504        sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5505        rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5506                                                   rbd_dev->name);
5507        if (!rbd_dev->task_wq)
5508                goto fail_dev_id;
5509
5510        /* we have a ref from do_rbd_add() */
5511        __module_get(THIS_MODULE);
5512
5513        dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5514        return rbd_dev;
5515
5516fail_dev_id:
5517        ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5518fail_rbd_dev:
5519        rbd_dev_free(rbd_dev);
5520        return NULL;
5521}
5522
5523static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5524{
5525        if (rbd_dev)
5526                put_device(&rbd_dev->dev);
5527}
5528
5529/*
5530 * Get the size and object order for an image snapshot, or if
5531 * snap_id is CEPH_NOSNAP, gets this information for the base
5532 * image.
5533 */
5534static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5535                                u8 *order, u64 *snap_size)
5536{
5537        __le64 snapid = cpu_to_le64(snap_id);
5538        int ret;
5539        struct {
5540                u8 order;
5541                __le64 size;
5542        } __attribute__ ((packed)) size_buf = { 0 };
5543
5544        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5545                                  &rbd_dev->header_oloc, "get_size",
5546                                  &snapid, sizeof(snapid),
5547                                  &size_buf, sizeof(size_buf));
5548        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5549        if (ret < 0)
5550                return ret;
5551        if (ret < sizeof (size_buf))
5552                return -ERANGE;
5553
5554        if (order) {
5555                *order = size_buf.order;
5556                dout("  order %u", (unsigned int)*order);
5557        }
5558        *snap_size = le64_to_cpu(size_buf.size);
5559
5560        dout("  snap_id 0x%016llx snap_size = %llu\n",
5561                (unsigned long long)snap_id,
5562                (unsigned long long)*snap_size);
5563
5564        return 0;
5565}
5566
5567static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5568{
5569        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5570                                        &rbd_dev->header.obj_order,
5571                                        &rbd_dev->header.image_size);
5572}
5573
5574static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5575{
5576        size_t size;
5577        void *reply_buf;
5578        int ret;
5579        void *p;
5580
5581        /* Response will be an encoded string, which includes a length */
5582        size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5583        reply_buf = kzalloc(size, GFP_KERNEL);
5584        if (!reply_buf)
5585                return -ENOMEM;
5586
5587        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5588                                  &rbd_dev->header_oloc, "get_object_prefix",
5589                                  NULL, 0, reply_buf, size);
5590        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5591        if (ret < 0)
5592                goto out;
5593
5594        p = reply_buf;
5595        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5596                                                p + ret, NULL, GFP_NOIO);
5597        ret = 0;
5598
5599        if (IS_ERR(rbd_dev->header.object_prefix)) {
5600                ret = PTR_ERR(rbd_dev->header.object_prefix);
5601                rbd_dev->header.object_prefix = NULL;
5602        } else {
5603                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5604        }
5605out:
5606        kfree(reply_buf);
5607
5608        return ret;
5609}
5610
5611static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5612                                     bool read_only, u64 *snap_features)
5613{
5614        struct {
5615                __le64 snap_id;
5616                u8 read_only;
5617        } features_in;
5618        struct {
5619                __le64 features;
5620                __le64 incompat;
5621        } __attribute__ ((packed)) features_buf = { 0 };
5622        u64 unsup;
5623        int ret;
5624
5625        features_in.snap_id = cpu_to_le64(snap_id);
5626        features_in.read_only = read_only;
5627
5628        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5629                                  &rbd_dev->header_oloc, "get_features",
5630                                  &features_in, sizeof(features_in),
5631                                  &features_buf, sizeof(features_buf));
5632        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5633        if (ret < 0)
5634                return ret;
5635        if (ret < sizeof (features_buf))
5636                return -ERANGE;
5637
5638        unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5639        if (unsup) {
5640                rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5641                         unsup);
5642                return -ENXIO;
5643        }
5644
5645        *snap_features = le64_to_cpu(features_buf.features);
5646
5647        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5648                (unsigned long long)snap_id,
5649                (unsigned long long)*snap_features,
5650                (unsigned long long)le64_to_cpu(features_buf.incompat));
5651
5652        return 0;
5653}
5654
5655static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5656{
5657        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5658                                         rbd_is_ro(rbd_dev),
5659                                         &rbd_dev->header.features);
5660}
5661
5662/*
5663 * These are generic image flags, but since they are used only for
5664 * object map, store them in rbd_dev->object_map_flags.
5665 *
5666 * For the same reason, this function is called only on object map
5667 * (re)load and not on header refresh.
5668 */
5669static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5670{
5671        __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5672        __le64 flags;
5673        int ret;
5674
5675        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5676                                  &rbd_dev->header_oloc, "get_flags",
5677                                  &snapid, sizeof(snapid),
5678                                  &flags, sizeof(flags));
5679        if (ret < 0)
5680                return ret;
5681        if (ret < sizeof(flags))
5682                return -EBADMSG;
5683
5684        rbd_dev->object_map_flags = le64_to_cpu(flags);
5685        return 0;
5686}
5687
5688struct parent_image_info {
5689        u64             pool_id;
5690        const char      *pool_ns;
5691        const char      *image_id;
5692        u64             snap_id;
5693
5694        bool            has_overlap;
5695        u64             overlap;
5696};
5697
5698/*
5699 * The caller is responsible for @pii.
5700 */
5701static int decode_parent_image_spec(void **p, void *end,
5702                                    struct parent_image_info *pii)
5703{
5704        u8 struct_v;
5705        u32 struct_len;
5706        int ret;
5707
5708        ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5709                                  &struct_v, &struct_len);
5710        if (ret)
5711                return ret;
5712
5713        ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5714        pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5715        if (IS_ERR(pii->pool_ns)) {
5716                ret = PTR_ERR(pii->pool_ns);
5717                pii->pool_ns = NULL;
5718                return ret;
5719        }
5720        pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5721        if (IS_ERR(pii->image_id)) {
5722                ret = PTR_ERR(pii->image_id);
5723                pii->image_id = NULL;
5724                return ret;
5725        }
5726        ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5727        return 0;
5728
5729e_inval:
5730        return -EINVAL;
5731}
5732
5733static int __get_parent_info(struct rbd_device *rbd_dev,
5734                             struct page *req_page,
5735                             struct page *reply_page,
5736                             struct parent_image_info *pii)
5737{
5738        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5739        size_t reply_len = PAGE_SIZE;
5740        void *p, *end;
5741        int ret;
5742
5743        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5744                             "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5745                             req_page, sizeof(u64), &reply_page, &reply_len);
5746        if (ret)
5747                return ret == -EOPNOTSUPP ? 1 : ret;
5748
5749        p = page_address(reply_page);
5750        end = p + reply_len;
5751        ret = decode_parent_image_spec(&p, end, pii);
5752        if (ret)
5753                return ret;
5754
5755        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5756                             "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5757                             req_page, sizeof(u64), &reply_page, &reply_len);
5758        if (ret)
5759                return ret;
5760
5761        p = page_address(reply_page);
5762        end = p + reply_len;
5763        ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5764        if (pii->has_overlap)
5765                ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5766
5767        return 0;
5768
5769e_inval:
5770        return -EINVAL;
5771}
5772
5773/*
5774 * The caller is responsible for @pii.
5775 */
5776static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5777                                    struct page *req_page,
5778                                    struct page *reply_page,
5779                                    struct parent_image_info *pii)
5780{
5781        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5782        size_t reply_len = PAGE_SIZE;
5783        void *p, *end;
5784        int ret;
5785
5786        ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5787                             "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5788                             req_page, sizeof(u64), &reply_page, &reply_len);
5789        if (ret)
5790                return ret;
5791
5792        p = page_address(reply_page);
5793        end = p + reply_len;
5794        ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5795        pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5796        if (IS_ERR(pii->image_id)) {
5797                ret = PTR_ERR(pii->image_id);
5798                pii->image_id = NULL;
5799                return ret;
5800        }
5801        ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5802        pii->has_overlap = true;
5803        ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5804
5805        return 0;
5806
5807e_inval:
5808        return -EINVAL;
5809}
5810
5811static int get_parent_info(struct rbd_device *rbd_dev,
5812                           struct parent_image_info *pii)
5813{
5814        struct page *req_page, *reply_page;
5815        void *p;
5816        int ret;
5817
5818        req_page = alloc_page(GFP_KERNEL);
5819        if (!req_page)
5820                return -ENOMEM;
5821
5822        reply_page = alloc_page(GFP_KERNEL);
5823        if (!reply_page) {
5824                __free_page(req_page);
5825                return -ENOMEM;
5826        }
5827
5828        p = page_address(req_page);
5829        ceph_encode_64(&p, rbd_dev->spec->snap_id);
5830        ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5831        if (ret > 0)
5832                ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5833                                               pii);
5834
5835        __free_page(req_page);
5836        __free_page(reply_page);
5837        return ret;
5838}
5839
5840static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5841{
5842        struct rbd_spec *parent_spec;
5843        struct parent_image_info pii = { 0 };
5844        int ret;
5845
5846        parent_spec = rbd_spec_alloc();
5847        if (!parent_spec)
5848                return -ENOMEM;
5849
5850        ret = get_parent_info(rbd_dev, &pii);
5851        if (ret)
5852                goto out_err;
5853
5854        dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5855             __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5856             pii.has_overlap, pii.overlap);
5857
5858        if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5859                /*
5860                 * Either the parent never existed, or we have
5861                 * record of it but the image got flattened so it no
5862                 * longer has a parent.  When the parent of a
5863                 * layered image disappears we immediately set the
5864                 * overlap to 0.  The effect of this is that all new
5865                 * requests will be treated as if the image had no
5866                 * parent.
5867                 *
5868                 * If !pii.has_overlap, the parent image spec is not
5869                 * applicable.  It's there to avoid duplication in each
5870                 * snapshot record.
5871                 */
5872                if (rbd_dev->parent_overlap) {
5873                        rbd_dev->parent_overlap = 0;
5874                        rbd_dev_parent_put(rbd_dev);
5875                        pr_info("%s: clone image has been flattened\n",
5876                                rbd_dev->disk->disk_name);
5877                }
5878
5879                goto out;       /* No parent?  No problem. */
5880        }
5881
5882        /* The ceph file layout needs to fit pool id in 32 bits */
5883
5884        ret = -EIO;
5885        if (pii.pool_id > (u64)U32_MAX) {
5886                rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5887                        (unsigned long long)pii.pool_id, U32_MAX);
5888                goto out_err;
5889        }
5890
5891        /*
5892         * The parent won't change (except when the clone is
5893         * flattened, already handled that).  So we only need to
5894         * record the parent spec we have not already done so.
5895         */
5896        if (!rbd_dev->parent_spec) {
5897                parent_spec->pool_id = pii.pool_id;
5898                if (pii.pool_ns && *pii.pool_ns) {
5899                        parent_spec->pool_ns = pii.pool_ns;
5900                        pii.pool_ns = NULL;
5901                }
5902                parent_spec->image_id = pii.image_id;
5903                pii.image_id = NULL;
5904                parent_spec->snap_id = pii.snap_id;
5905
5906                rbd_dev->parent_spec = parent_spec;
5907                parent_spec = NULL;     /* rbd_dev now owns this */
5908        }
5909
5910        /*
5911         * We always update the parent overlap.  If it's zero we issue
5912         * a warning, as we will proceed as if there was no parent.
5913         */
5914        if (!pii.overlap) {
5915                if (parent_spec) {
5916                        /* refresh, careful to warn just once */
5917                        if (rbd_dev->parent_overlap)
5918                                rbd_warn(rbd_dev,
5919                                    "clone now standalone (overlap became 0)");
5920                } else {
5921                        /* initial probe */
5922                        rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5923                }
5924        }
5925        rbd_dev->parent_overlap = pii.overlap;
5926
5927out:
5928        ret = 0;
5929out_err:
5930        kfree(pii.pool_ns);
5931        kfree(pii.image_id);
5932        rbd_spec_put(parent_spec);
5933        return ret;
5934}
5935
5936static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5937{
5938        struct {
5939                __le64 stripe_unit;
5940                __le64 stripe_count;
5941        } __attribute__ ((packed)) striping_info_buf = { 0 };
5942        size_t size = sizeof (striping_info_buf);
5943        void *p;
5944        int ret;
5945
5946        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5947                                &rbd_dev->header_oloc, "get_stripe_unit_count",
5948                                NULL, 0, &striping_info_buf, size);
5949        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5950        if (ret < 0)
5951                return ret;
5952        if (ret < size)
5953                return -ERANGE;
5954
5955        p = &striping_info_buf;
5956        rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5957        rbd_dev->header.stripe_count = ceph_decode_64(&p);
5958        return 0;
5959}
5960
5961static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5962{
5963        __le64 data_pool_id;
5964        int ret;
5965
5966        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5967                                  &rbd_dev->header_oloc, "get_data_pool",
5968                                  NULL, 0, &data_pool_id, sizeof(data_pool_id));
5969        if (ret < 0)
5970                return ret;
5971        if (ret < sizeof(data_pool_id))
5972                return -EBADMSG;
5973
5974        rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5975        WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5976        return 0;
5977}
5978
5979static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5980{
5981        CEPH_DEFINE_OID_ONSTACK(oid);
5982        size_t image_id_size;
5983        char *image_id;
5984        void *p;
5985        void *end;
5986        size_t size;
5987        void *reply_buf = NULL;
5988        size_t len = 0;
5989        char *image_name = NULL;
5990        int ret;
5991
5992        rbd_assert(!rbd_dev->spec->image_name);
5993
5994        len = strlen(rbd_dev->spec->image_id);
5995        image_id_size = sizeof (__le32) + len;
5996        image_id = kmalloc(image_id_size, GFP_KERNEL);
5997        if (!image_id)
5998                return NULL;
5999
6000        p = image_id;
6001        end = image_id + image_id_size;
6002        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6003
6004        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6005        reply_buf = kmalloc(size, GFP_KERNEL);
6006        if (!reply_buf)
6007                goto out;
6008
6009        ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6010        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6011                                  "dir_get_name", image_id, image_id_size,
6012                                  reply_buf, size);
6013        if (ret < 0)
6014                goto out;
6015        p = reply_buf;
6016        end = reply_buf + ret;
6017
6018        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6019        if (IS_ERR(image_name))
6020                image_name = NULL;
6021        else
6022                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6023out:
6024        kfree(reply_buf);
6025        kfree(image_id);
6026
6027        return image_name;
6028}
6029
6030static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6031{
6032        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6033        const char *snap_name;
6034        u32 which = 0;
6035
6036        /* Skip over names until we find the one we are looking for */
6037
6038        snap_name = rbd_dev->header.snap_names;
6039        while (which < snapc->num_snaps) {
6040                if (!strcmp(name, snap_name))
6041                        return snapc->snaps[which];
6042                snap_name += strlen(snap_name) + 1;
6043                which++;
6044        }
6045        return CEPH_NOSNAP;
6046}
6047
6048static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6049{
6050        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6051        u32 which;
6052        bool found = false;
6053        u64 snap_id;
6054
6055        for (which = 0; !found && which < snapc->num_snaps; which++) {
6056                const char *snap_name;
6057
6058                snap_id = snapc->snaps[which];
6059                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6060                if (IS_ERR(snap_name)) {
6061                        /* ignore no-longer existing snapshots */
6062                        if (PTR_ERR(snap_name) == -ENOENT)
6063                                continue;
6064                        else
6065                                break;
6066                }
6067                found = !strcmp(name, snap_name);
6068                kfree(snap_name);
6069        }
6070        return found ? snap_id : CEPH_NOSNAP;
6071}
6072
6073/*
6074 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6075 * no snapshot by that name is found, or if an error occurs.
6076 */
6077static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6078{
6079        if (rbd_dev->image_format == 1)
6080                return rbd_v1_snap_id_by_name(rbd_dev, name);
6081
6082        return rbd_v2_snap_id_by_name(rbd_dev, name);
6083}
6084
6085/*
6086 * An image being mapped will have everything but the snap id.
6087 */
6088static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6089{
6090        struct rbd_spec *spec = rbd_dev->spec;
6091
6092        rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6093        rbd_assert(spec->image_id && spec->image_name);
6094        rbd_assert(spec->snap_name);
6095
6096        if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6097                u64 snap_id;
6098
6099                snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6100                if (snap_id == CEPH_NOSNAP)
6101                        return -ENOENT;
6102
6103                spec->snap_id = snap_id;
6104        } else {
6105                spec->snap_id = CEPH_NOSNAP;
6106        }
6107
6108        return 0;
6109}
6110
6111/*
6112 * A parent image will have all ids but none of the names.
6113 *
6114 * All names in an rbd spec are dynamically allocated.  It's OK if we
6115 * can't figure out the name for an image id.
6116 */
6117static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6118{
6119        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6120        struct rbd_spec *spec = rbd_dev->spec;
6121        const char *pool_name;
6122        const char *image_name;
6123        const char *snap_name;
6124        int ret;
6125
6126        rbd_assert(spec->pool_id != CEPH_NOPOOL);
6127        rbd_assert(spec->image_id);
6128        rbd_assert(spec->snap_id != CEPH_NOSNAP);
6129
6130        /* Get the pool name; we have to make our own copy of this */
6131
6132        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6133        if (!pool_name) {
6134                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6135                return -EIO;
6136        }
6137        pool_name = kstrdup(pool_name, GFP_KERNEL);
6138        if (!pool_name)
6139                return -ENOMEM;
6140
6141        /* Fetch the image name; tolerate failure here */
6142
6143        image_name = rbd_dev_image_name(rbd_dev);
6144        if (!image_name)
6145                rbd_warn(rbd_dev, "unable to get image name");
6146
6147        /* Fetch the snapshot name */
6148
6149        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6150        if (IS_ERR(snap_name)) {
6151                ret = PTR_ERR(snap_name);
6152                goto out_err;
6153        }
6154
6155        spec->pool_name = pool_name;
6156        spec->image_name = image_name;
6157        spec->snap_name = snap_name;
6158
6159        return 0;
6160
6161out_err:
6162        kfree(image_name);
6163        kfree(pool_name);
6164        return ret;
6165}
6166
6167static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6168{
6169        size_t size;
6170        int ret;
6171        void *reply_buf;
6172        void *p;
6173        void *end;
6174        u64 seq;
6175        u32 snap_count;
6176        struct ceph_snap_context *snapc;
6177        u32 i;
6178
6179        /*
6180         * We'll need room for the seq value (maximum snapshot id),
6181         * snapshot count, and array of that many snapshot ids.
6182         * For now we have a fixed upper limit on the number we're
6183         * prepared to receive.
6184         */
6185        size = sizeof (__le64) + sizeof (__le32) +
6186                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
6187        reply_buf = kzalloc(size, GFP_KERNEL);
6188        if (!reply_buf)
6189                return -ENOMEM;
6190
6191        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6192                                  &rbd_dev->header_oloc, "get_snapcontext",
6193                                  NULL, 0, reply_buf, size);
6194        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6195        if (ret < 0)
6196                goto out;
6197
6198        p = reply_buf;
6199        end = reply_buf + ret;
6200        ret = -ERANGE;
6201        ceph_decode_64_safe(&p, end, seq, out);
6202        ceph_decode_32_safe(&p, end, snap_count, out);
6203
6204        /*
6205         * Make sure the reported number of snapshot ids wouldn't go
6206         * beyond the end of our buffer.  But before checking that,
6207         * make sure the computed size of the snapshot context we
6208         * allocate is representable in a size_t.
6209         */
6210        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6211                                 / sizeof (u64)) {
6212                ret = -EINVAL;
6213                goto out;
6214        }
6215        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6216                goto out;
6217        ret = 0;
6218
6219        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6220        if (!snapc) {
6221                ret = -ENOMEM;
6222                goto out;
6223        }
6224        snapc->seq = seq;
6225        for (i = 0; i < snap_count; i++)
6226                snapc->snaps[i] = ceph_decode_64(&p);
6227
6228        ceph_put_snap_context(rbd_dev->header.snapc);
6229        rbd_dev->header.snapc = snapc;
6230
6231        dout("  snap context seq = %llu, snap_count = %u\n",
6232                (unsigned long long)seq, (unsigned int)snap_count);
6233out:
6234        kfree(reply_buf);
6235
6236        return ret;
6237}
6238
6239static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6240                                        u64 snap_id)
6241{
6242        size_t size;
6243        void *reply_buf;
6244        __le64 snapid;
6245        int ret;
6246        void *p;
6247        void *end;
6248        char *snap_name;
6249
6250        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6251        reply_buf = kmalloc(size, GFP_KERNEL);
6252        if (!reply_buf)
6253                return ERR_PTR(-ENOMEM);
6254
6255        snapid = cpu_to_le64(snap_id);
6256        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6257                                  &rbd_dev->header_oloc, "get_snapshot_name",
6258                                  &snapid, sizeof(snapid), reply_buf, size);
6259        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6260        if (ret < 0) {
6261                snap_name = ERR_PTR(ret);
6262                goto out;
6263        }
6264
6265        p = reply_buf;
6266        end = reply_buf + ret;
6267        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6268        if (IS_ERR(snap_name))
6269                goto out;
6270
6271        dout("  snap_id 0x%016llx snap_name = %s\n",
6272                (unsigned long long)snap_id, snap_name);
6273out:
6274        kfree(reply_buf);
6275
6276        return snap_name;
6277}
6278
6279static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6280{
6281        bool first_time = rbd_dev->header.object_prefix == NULL;
6282        int ret;
6283
6284        ret = rbd_dev_v2_image_size(rbd_dev);
6285        if (ret)
6286                return ret;
6287
6288        if (first_time) {
6289                ret = rbd_dev_v2_header_onetime(rbd_dev);
6290                if (ret)
6291                        return ret;
6292        }
6293
6294        ret = rbd_dev_v2_snap_context(rbd_dev);
6295        if (ret && first_time) {
6296                kfree(rbd_dev->header.object_prefix);
6297                rbd_dev->header.object_prefix = NULL;
6298        }
6299
6300        return ret;
6301}
6302
6303static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6304{
6305        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6306
6307        if (rbd_dev->image_format == 1)
6308                return rbd_dev_v1_header_info(rbd_dev);
6309
6310        return rbd_dev_v2_header_info(rbd_dev);
6311}
6312
6313/*
6314 * Skips over white space at *buf, and updates *buf to point to the
6315 * first found non-space character (if any). Returns the length of
6316 * the token (string of non-white space characters) found.  Note
6317 * that *buf must be terminated with '\0'.
6318 */
6319static inline size_t next_token(const char **buf)
6320{
6321        /*
6322        * These are the characters that produce nonzero for
6323        * isspace() in the "C" and "POSIX" locales.
6324        */
6325        const char *spaces = " \f\n\r\t\v";
6326
6327        *buf += strspn(*buf, spaces);   /* Find start of token */
6328
6329        return strcspn(*buf, spaces);   /* Return token length */
6330}
6331
6332/*
6333 * Finds the next token in *buf, dynamically allocates a buffer big
6334 * enough to hold a copy of it, and copies the token into the new
6335 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6336 * that a duplicate buffer is created even for a zero-length token.
6337 *
6338 * Returns a pointer to the newly-allocated duplicate, or a null
6339 * pointer if memory for the duplicate was not available.  If
6340 * the lenp argument is a non-null pointer, the length of the token
6341 * (not including the '\0') is returned in *lenp.
6342 *
6343 * If successful, the *buf pointer will be updated to point beyond
6344 * the end of the found token.
6345 *
6346 * Note: uses GFP_KERNEL for allocation.
6347 */
6348static inline char *dup_token(const char **buf, size_t *lenp)
6349{
6350        char *dup;
6351        size_t len;
6352
6353        len = next_token(buf);
6354        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6355        if (!dup)
6356                return NULL;
6357        *(dup + len) = '\0';
6358        *buf += len;
6359
6360        if (lenp)
6361                *lenp = len;
6362
6363        return dup;
6364}
6365
6366/*
6367 * Parse the options provided for an "rbd add" (i.e., rbd image
6368 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6369 * and the data written is passed here via a NUL-terminated buffer.
6370 * Returns 0 if successful or an error code otherwise.
6371 *
6372 * The information extracted from these options is recorded in
6373 * the other parameters which return dynamically-allocated
6374 * structures:
6375 *  ceph_opts
6376 *      The address of a pointer that will refer to a ceph options
6377 *      structure.  Caller must release the returned pointer using
6378 *      ceph_destroy_options() when it is no longer needed.
6379 *  rbd_opts
6380 *      Address of an rbd options pointer.  Fully initialized by
6381 *      this function; caller must release with kfree().
6382 *  spec
6383 *      Address of an rbd image specification pointer.  Fully
6384 *      initialized by this function based on parsed options.
6385 *      Caller must release with rbd_spec_put().
6386 *
6387 * The options passed take this form:
6388 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6389 * where:
6390 *  <mon_addrs>
6391 *      A comma-separated list of one or more monitor addresses.
6392 *      A monitor address is an ip address, optionally followed
6393 *      by a port number (separated by a colon).
6394 *        I.e.:  ip1[:port1][,ip2[:port2]...]
6395 *  <options>
6396 *      A comma-separated list of ceph and/or rbd options.
6397 *  <pool_name>
6398 *      The name of the rados pool containing the rbd image.
6399 *  <image_name>
6400 *      The name of the image in that pool to map.
6401 *  <snap_id>
6402 *      An optional snapshot id.  If provided, the mapping will
6403 *      present data from the image at the time that snapshot was
6404 *      created.  The image head is used if no snapshot id is
6405 *      provided.  Snapshot mappings are always read-only.
6406 */
6407static int rbd_add_parse_args(const char *buf,
6408                                struct ceph_options **ceph_opts,
6409                                struct rbd_options **opts,
6410                                struct rbd_spec **rbd_spec)
6411{
6412        size_t len;
6413        char *options;
6414        const char *mon_addrs;
6415        char *snap_name;
6416        size_t mon_addrs_size;
6417        struct parse_rbd_opts_ctx pctx = { 0 };
6418        struct ceph_options *copts;
6419        int ret;
6420
6421        /* The first four tokens are required */
6422
6423        len = next_token(&buf);
6424        if (!len) {
6425                rbd_warn(NULL, "no monitor address(es) provided");
6426                return -EINVAL;
6427        }
6428        mon_addrs = buf;
6429        mon_addrs_size = len + 1;
6430        buf += len;
6431
6432        ret = -EINVAL;
6433        options = dup_token(&buf, NULL);
6434        if (!options)
6435                return -ENOMEM;
6436        if (!*options) {
6437                rbd_warn(NULL, "no options provided");
6438                goto out_err;
6439        }
6440
6441        pctx.spec = rbd_spec_alloc();
6442        if (!pctx.spec)
6443                goto out_mem;
6444
6445        pctx.spec->pool_name = dup_token(&buf, NULL);
6446        if (!pctx.spec->pool_name)
6447                goto out_mem;
6448        if (!*pctx.spec->pool_name) {
6449                rbd_warn(NULL, "no pool name provided");
6450                goto out_err;
6451        }
6452
6453        pctx.spec->image_name = dup_token(&buf, NULL);
6454        if (!pctx.spec->image_name)
6455                goto out_mem;
6456        if (!*pctx.spec->image_name) {
6457                rbd_warn(NULL, "no image name provided");
6458                goto out_err;
6459        }
6460
6461        /*
6462         * Snapshot name is optional; default is to use "-"
6463         * (indicating the head/no snapshot).
6464         */
6465        len = next_token(&buf);
6466        if (!len) {
6467                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6468                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6469        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6470                ret = -ENAMETOOLONG;
6471                goto out_err;
6472        }
6473        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6474        if (!snap_name)
6475                goto out_mem;
6476        *(snap_name + len) = '\0';
6477        pctx.spec->snap_name = snap_name;
6478
6479        /* Initialize all rbd options to the defaults */
6480
6481        pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6482        if (!pctx.opts)
6483                goto out_mem;
6484
6485        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6486        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6487        pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6488        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6489        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6490        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6491        pctx.opts->trim = RBD_TRIM_DEFAULT;
6492
6493        copts = ceph_parse_options(options, mon_addrs,
6494                                   mon_addrs + mon_addrs_size - 1,
6495                                   parse_rbd_opts_token, &pctx);
6496        if (IS_ERR(copts)) {
6497                ret = PTR_ERR(copts);
6498                goto out_err;
6499        }
6500        kfree(options);
6501
6502        *ceph_opts = copts;
6503        *opts = pctx.opts;
6504        *rbd_spec = pctx.spec;
6505
6506        return 0;
6507out_mem:
6508        ret = -ENOMEM;
6509out_err:
6510        kfree(pctx.opts);
6511        rbd_spec_put(pctx.spec);
6512        kfree(options);
6513
6514        return ret;
6515}
6516
6517static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6518{
6519        down_write(&rbd_dev->lock_rwsem);
6520        if (__rbd_is_lock_owner(rbd_dev))
6521                __rbd_release_lock(rbd_dev);
6522        up_write(&rbd_dev->lock_rwsem);
6523}
6524
6525/*
6526 * If the wait is interrupted, an error is returned even if the lock
6527 * was successfully acquired.  rbd_dev_image_unlock() will release it
6528 * if needed.
6529 */
6530static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6531{
6532        long ret;
6533
6534        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6535                if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6536                        return 0;
6537
6538                rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6539                return -EINVAL;
6540        }
6541
6542        if (rbd_is_ro(rbd_dev))
6543                return 0;
6544
6545        rbd_assert(!rbd_is_lock_owner(rbd_dev));
6546        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6547        ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6548                            ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6549        if (ret > 0) {
6550                ret = rbd_dev->acquire_err;
6551        } else {
6552                cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6553                if (!ret)
6554                        ret = -ETIMEDOUT;
6555        }
6556
6557        if (ret) {
6558                rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6559                return ret;
6560        }
6561
6562        /*
6563         * The lock may have been released by now, unless automatic lock
6564         * transitions are disabled.
6565         */
6566        rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6567        return 0;
6568}
6569
6570/*
6571 * An rbd format 2 image has a unique identifier, distinct from the
6572 * name given to it by the user.  Internally, that identifier is
6573 * what's used to specify the names of objects related to the image.
6574 *
6575 * A special "rbd id" object is used to map an rbd image name to its
6576 * id.  If that object doesn't exist, then there is no v2 rbd image
6577 * with the supplied name.
6578 *
6579 * This function will record the given rbd_dev's image_id field if
6580 * it can be determined, and in that case will return 0.  If any
6581 * errors occur a negative errno will be returned and the rbd_dev's
6582 * image_id field will be unchanged (and should be NULL).
6583 */
6584static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6585{
6586        int ret;
6587        size_t size;
6588        CEPH_DEFINE_OID_ONSTACK(oid);
6589        void *response;
6590        char *image_id;
6591
6592        /*
6593         * When probing a parent image, the image id is already
6594         * known (and the image name likely is not).  There's no
6595         * need to fetch the image id again in this case.  We
6596         * do still need to set the image format though.
6597         */
6598        if (rbd_dev->spec->image_id) {
6599                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6600
6601                return 0;
6602        }
6603
6604        /*
6605         * First, see if the format 2 image id file exists, and if
6606         * so, get the image's persistent id from it.
6607         */
6608        ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6609                               rbd_dev->spec->image_name);
6610        if (ret)
6611                return ret;
6612
6613        dout("rbd id object name is %s\n", oid.name);
6614
6615        /* Response will be an encoded string, which includes a length */
6616        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6617        response = kzalloc(size, GFP_NOIO);
6618        if (!response) {
6619                ret = -ENOMEM;
6620                goto out;
6621        }
6622
6623        /* If it doesn't exist we'll assume it's a format 1 image */
6624
6625        ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6626                                  "get_id", NULL, 0,
6627                                  response, size);
6628        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6629        if (ret == -ENOENT) {
6630                image_id = kstrdup("", GFP_KERNEL);
6631                ret = image_id ? 0 : -ENOMEM;
6632                if (!ret)
6633                        rbd_dev->image_format = 1;
6634        } else if (ret >= 0) {
6635                void *p = response;
6636
6637                image_id = ceph_extract_encoded_string(&p, p + ret,
6638                                                NULL, GFP_NOIO);
6639                ret = PTR_ERR_OR_ZERO(image_id);
6640                if (!ret)
6641                        rbd_dev->image_format = 2;
6642        }
6643
6644        if (!ret) {
6645                rbd_dev->spec->image_id = image_id;
6646                dout("image_id is %s\n", image_id);
6647        }
6648out:
6649        kfree(response);
6650        ceph_oid_destroy(&oid);
6651        return ret;
6652}
6653
6654/*
6655 * Undo whatever state changes are made by v1 or v2 header info
6656 * call.
6657 */
6658static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6659{
6660        struct rbd_image_header *header;
6661
6662        rbd_dev_parent_put(rbd_dev);
6663        rbd_object_map_free(rbd_dev);
6664        rbd_dev_mapping_clear(rbd_dev);
6665
6666        /* Free dynamic fields from the header, then zero it out */
6667
6668        header = &rbd_dev->header;
6669        ceph_put_snap_context(header->snapc);
6670        kfree(header->snap_sizes);
6671        kfree(header->snap_names);
6672        kfree(header->object_prefix);
6673        memset(header, 0, sizeof (*header));
6674}
6675
6676static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6677{
6678        int ret;
6679
6680        ret = rbd_dev_v2_object_prefix(rbd_dev);
6681        if (ret)
6682                goto out_err;
6683
6684        /*
6685         * Get the and check features for the image.  Currently the
6686         * features are assumed to never change.
6687         */
6688        ret = rbd_dev_v2_features(rbd_dev);
6689        if (ret)
6690                goto out_err;
6691
6692        /* If the image supports fancy striping, get its parameters */
6693
6694        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6695                ret = rbd_dev_v2_striping_info(rbd_dev);
6696                if (ret < 0)
6697                        goto out_err;
6698        }
6699
6700        if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6701                ret = rbd_dev_v2_data_pool(rbd_dev);
6702                if (ret)
6703                        goto out_err;
6704        }
6705
6706        rbd_init_layout(rbd_dev);
6707        return 0;
6708
6709out_err:
6710        rbd_dev->header.features = 0;
6711        kfree(rbd_dev->header.object_prefix);
6712        rbd_dev->header.object_prefix = NULL;
6713        return ret;
6714}
6715
6716/*
6717 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6718 * rbd_dev_image_probe() recursion depth, which means it's also the
6719 * length of the already discovered part of the parent chain.
6720 */
6721static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6722{
6723        struct rbd_device *parent = NULL;
6724        int ret;
6725
6726        if (!rbd_dev->parent_spec)
6727                return 0;
6728
6729        if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6730                pr_info("parent chain is too long (%d)\n", depth);
6731                ret = -EINVAL;
6732                goto out_err;
6733        }
6734
6735        parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6736        if (!parent) {
6737                ret = -ENOMEM;
6738                goto out_err;
6739        }
6740
6741        /*
6742         * Images related by parent/child relationships always share
6743         * rbd_client and spec/parent_spec, so bump their refcounts.
6744         */
6745        __rbd_get_client(rbd_dev->rbd_client);
6746        rbd_spec_get(rbd_dev->parent_spec);
6747
6748        __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6749
6750        ret = rbd_dev_image_probe(parent, depth);
6751        if (ret < 0)
6752                goto out_err;
6753
6754        rbd_dev->parent = parent;
6755        atomic_set(&rbd_dev->parent_ref, 1);
6756        return 0;
6757
6758out_err:
6759        rbd_dev_unparent(rbd_dev);
6760        rbd_dev_destroy(parent);
6761        return ret;
6762}
6763
6764static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6765{
6766        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6767        rbd_free_disk(rbd_dev);
6768        if (!single_major)
6769                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6770}
6771
6772/*
6773 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6774 * upon return.
6775 */
6776static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6777{
6778        int ret;
6779
6780        /* Record our major and minor device numbers. */
6781
6782        if (!single_major) {
6783                ret = register_blkdev(0, rbd_dev->name);
6784                if (ret < 0)
6785                        goto err_out_unlock;
6786
6787                rbd_dev->major = ret;
6788                rbd_dev->minor = 0;
6789        } else {
6790                rbd_dev->major = rbd_major;
6791                rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6792        }
6793
6794        /* Set up the blkdev mapping. */
6795
6796        ret = rbd_init_disk(rbd_dev);
6797        if (ret)
6798                goto err_out_blkdev;
6799
6800        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6801        set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6802
6803        ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6804        if (ret)
6805                goto err_out_disk;
6806
6807        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6808        up_write(&rbd_dev->header_rwsem);
6809        return 0;
6810
6811err_out_disk:
6812        rbd_free_disk(rbd_dev);
6813err_out_blkdev:
6814        if (!single_major)
6815                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6816err_out_unlock:
6817        up_write(&rbd_dev->header_rwsem);
6818        return ret;
6819}
6820
6821static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6822{
6823        struct rbd_spec *spec = rbd_dev->spec;
6824        int ret;
6825
6826        /* Record the header object name for this rbd image. */
6827
6828        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6829        if (rbd_dev->image_format == 1)
6830                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6831                                       spec->image_name, RBD_SUFFIX);
6832        else
6833                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6834                                       RBD_HEADER_PREFIX, spec->image_id);
6835
6836        return ret;
6837}
6838
6839static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6840{
6841        if (!is_snap) {
6842                pr_info("image %s/%s%s%s does not exist\n",
6843                        rbd_dev->spec->pool_name,
6844                        rbd_dev->spec->pool_ns ?: "",
6845                        rbd_dev->spec->pool_ns ? "/" : "",
6846                        rbd_dev->spec->image_name);
6847        } else {
6848                pr_info("snap %s/%s%s%s@%s does not exist\n",
6849                        rbd_dev->spec->pool_name,
6850                        rbd_dev->spec->pool_ns ?: "",
6851                        rbd_dev->spec->pool_ns ? "/" : "",
6852                        rbd_dev->spec->image_name,
6853                        rbd_dev->spec->snap_name);
6854        }
6855}
6856
6857static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6858{
6859        if (!rbd_is_ro(rbd_dev))
6860                rbd_unregister_watch(rbd_dev);
6861
6862        rbd_dev_unprobe(rbd_dev);
6863        rbd_dev->image_format = 0;
6864        kfree(rbd_dev->spec->image_id);
6865        rbd_dev->spec->image_id = NULL;
6866}
6867
6868/*
6869 * Probe for the existence of the header object for the given rbd
6870 * device.  If this image is the one being mapped (i.e., not a
6871 * parent), initiate a watch on its header object before using that
6872 * object to get detailed information about the rbd image.
6873 *
6874 * On success, returns with header_rwsem held for write if called
6875 * with @depth == 0.
6876 */
6877static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6878{
6879        bool need_watch = !rbd_is_ro(rbd_dev);
6880        int ret;
6881
6882        /*
6883         * Get the id from the image id object.  Unless there's an
6884         * error, rbd_dev->spec->image_id will be filled in with
6885         * a dynamically-allocated string, and rbd_dev->image_format
6886         * will be set to either 1 or 2.
6887         */
6888        ret = rbd_dev_image_id(rbd_dev);
6889        if (ret)
6890                return ret;
6891
6892        ret = rbd_dev_header_name(rbd_dev);
6893        if (ret)
6894                goto err_out_format;
6895
6896        if (need_watch) {
6897                ret = rbd_register_watch(rbd_dev);
6898                if (ret) {
6899                        if (ret == -ENOENT)
6900                                rbd_print_dne(rbd_dev, false);
6901                        goto err_out_format;
6902                }
6903        }
6904
6905        if (!depth)
6906                down_write(&rbd_dev->header_rwsem);
6907
6908        ret = rbd_dev_header_info(rbd_dev);
6909        if (ret) {
6910                if (ret == -ENOENT && !need_watch)
6911                        rbd_print_dne(rbd_dev, false);
6912                goto err_out_probe;
6913        }
6914
6915        /*
6916         * If this image is the one being mapped, we have pool name and
6917         * id, image name and id, and snap name - need to fill snap id.
6918         * Otherwise this is a parent image, identified by pool, image
6919         * and snap ids - need to fill in names for those ids.
6920         */
6921        if (!depth)
6922                ret = rbd_spec_fill_snap_id(rbd_dev);
6923        else
6924                ret = rbd_spec_fill_names(rbd_dev);
6925        if (ret) {
6926                if (ret == -ENOENT)
6927                        rbd_print_dne(rbd_dev, true);
6928                goto err_out_probe;
6929        }
6930
6931        ret = rbd_dev_mapping_set(rbd_dev);
6932        if (ret)
6933                goto err_out_probe;
6934
6935        if (rbd_is_snap(rbd_dev) &&
6936            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6937                ret = rbd_object_map_load(rbd_dev);
6938                if (ret)
6939                        goto err_out_probe;
6940        }
6941
6942        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6943                ret = rbd_dev_v2_parent_info(rbd_dev);
6944                if (ret)
6945                        goto err_out_probe;
6946        }
6947
6948        ret = rbd_dev_probe_parent(rbd_dev, depth);
6949        if (ret)
6950                goto err_out_probe;
6951
6952        dout("discovered format %u image, header name is %s\n",
6953                rbd_dev->image_format, rbd_dev->header_oid.name);
6954        return 0;
6955
6956err_out_probe:
6957        if (!depth)
6958                up_write(&rbd_dev->header_rwsem);
6959        if (need_watch)
6960                rbd_unregister_watch(rbd_dev);
6961        rbd_dev_unprobe(rbd_dev);
6962err_out_format:
6963        rbd_dev->image_format = 0;
6964        kfree(rbd_dev->spec->image_id);
6965        rbd_dev->spec->image_id = NULL;
6966        return ret;
6967}
6968
6969static ssize_t do_rbd_add(struct bus_type *bus,
6970                          const char *buf,
6971                          size_t count)
6972{
6973        struct rbd_device *rbd_dev = NULL;
6974        struct ceph_options *ceph_opts = NULL;
6975        struct rbd_options *rbd_opts = NULL;
6976        struct rbd_spec *spec = NULL;
6977        struct rbd_client *rbdc;
6978        int rc;
6979
6980        if (!capable(CAP_SYS_ADMIN))
6981                return -EPERM;
6982
6983        if (!try_module_get(THIS_MODULE))
6984                return -ENODEV;
6985
6986        /* parse add command */
6987        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6988        if (rc < 0)
6989                goto out;
6990
6991        rbdc = rbd_get_client(ceph_opts);
6992        if (IS_ERR(rbdc)) {
6993                rc = PTR_ERR(rbdc);
6994                goto err_out_args;
6995        }
6996
6997        /* pick the pool */
6998        rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
6999        if (rc < 0) {
7000                if (rc == -ENOENT)
7001                        pr_info("pool %s does not exist\n", spec->pool_name);
7002                goto err_out_client;
7003        }
7004        spec->pool_id = (u64)rc;
7005
7006        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7007        if (!rbd_dev) {
7008                rc = -ENOMEM;
7009                goto err_out_client;
7010        }
7011        rbdc = NULL;            /* rbd_dev now owns this */
7012        spec = NULL;            /* rbd_dev now owns this */
7013        rbd_opts = NULL;        /* rbd_dev now owns this */
7014
7015        /* if we are mapping a snapshot it will be a read-only mapping */
7016        if (rbd_dev->opts->read_only ||
7017            strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7018                __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7019
7020        rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7021        if (!rbd_dev->config_info) {
7022                rc = -ENOMEM;
7023                goto err_out_rbd_dev;
7024        }
7025
7026        rc = rbd_dev_image_probe(rbd_dev, 0);
7027        if (rc < 0)
7028                goto err_out_rbd_dev;
7029
7030        if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7031                rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7032                         rbd_dev->layout.object_size);
7033                rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7034        }
7035
7036        rc = rbd_dev_device_setup(rbd_dev);
7037        if (rc)
7038                goto err_out_image_probe;
7039
7040        rc = rbd_add_acquire_lock(rbd_dev);
7041        if (rc)
7042                goto err_out_image_lock;
7043
7044        /* Everything's ready.  Announce the disk to the world. */
7045
7046        rc = device_add(&rbd_dev->dev);
7047        if (rc)
7048                goto err_out_image_lock;
7049
7050        device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7051        /* see rbd_init_disk() */
7052        blk_put_queue(rbd_dev->disk->queue);
7053
7054        spin_lock(&rbd_dev_list_lock);
7055        list_add_tail(&rbd_dev->node, &rbd_dev_list);
7056        spin_unlock(&rbd_dev_list_lock);
7057
7058        pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7059                (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7060                rbd_dev->header.features);
7061        rc = count;
7062out:
7063        module_put(THIS_MODULE);
7064        return rc;
7065
7066err_out_image_lock:
7067        rbd_dev_image_unlock(rbd_dev);
7068        rbd_dev_device_release(rbd_dev);
7069err_out_image_probe:
7070        rbd_dev_image_release(rbd_dev);
7071err_out_rbd_dev:
7072        rbd_dev_destroy(rbd_dev);
7073err_out_client:
7074        rbd_put_client(rbdc);
7075err_out_args:
7076        rbd_spec_put(spec);
7077        kfree(rbd_opts);
7078        goto out;
7079}
7080
7081static ssize_t rbd_add(struct bus_type *bus,
7082                       const char *buf,
7083                       size_t count)
7084{
7085        if (single_major)
7086                return -EINVAL;
7087
7088        return do_rbd_add(bus, buf, count);
7089}
7090
7091static ssize_t rbd_add_single_major(struct bus_type *bus,
7092                                    const char *buf,
7093                                    size_t count)
7094{
7095        return do_rbd_add(bus, buf, count);
7096}
7097
7098static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7099{
7100        while (rbd_dev->parent) {
7101                struct rbd_device *first = rbd_dev;
7102                struct rbd_device *second = first->parent;
7103                struct rbd_device *third;
7104
7105                /*
7106                 * Follow to the parent with no grandparent and
7107                 * remove it.
7108                 */
7109                while (second && (third = second->parent)) {
7110                        first = second;
7111                        second = third;
7112                }
7113                rbd_assert(second);
7114                rbd_dev_image_release(second);
7115                rbd_dev_destroy(second);
7116                first->parent = NULL;
7117                first->parent_overlap = 0;
7118
7119                rbd_assert(first->parent_spec);
7120                rbd_spec_put(first->parent_spec);
7121                first->parent_spec = NULL;
7122        }
7123}
7124
7125static ssize_t do_rbd_remove(struct bus_type *bus,
7126                             const char *buf,
7127                             size_t count)
7128{
7129        struct rbd_device *rbd_dev = NULL;
7130        struct list_head *tmp;
7131        int dev_id;
7132        char opt_buf[6];
7133        bool force = false;
7134        int ret;
7135
7136        if (!capable(CAP_SYS_ADMIN))
7137                return -EPERM;
7138
7139        dev_id = -1;
7140        opt_buf[0] = '\0';
7141        sscanf(buf, "%d %5s", &dev_id, opt_buf);
7142        if (dev_id < 0) {
7143                pr_err("dev_id out of range\n");
7144                return -EINVAL;
7145        }
7146        if (opt_buf[0] != '\0') {
7147                if (!strcmp(opt_buf, "force")) {
7148                        force = true;
7149                } else {
7150                        pr_err("bad remove option at '%s'\n", opt_buf);
7151                        return -EINVAL;
7152                }
7153        }
7154
7155        ret = -ENOENT;
7156        spin_lock(&rbd_dev_list_lock);
7157        list_for_each(tmp, &rbd_dev_list) {
7158                rbd_dev = list_entry(tmp, struct rbd_device, node);
7159                if (rbd_dev->dev_id == dev_id) {
7160                        ret = 0;
7161                        break;
7162                }
7163        }
7164        if (!ret) {
7165                spin_lock_irq(&rbd_dev->lock);
7166                if (rbd_dev->open_count && !force)
7167                        ret = -EBUSY;
7168                else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7169                                          &rbd_dev->flags))
7170                        ret = -EINPROGRESS;
7171                spin_unlock_irq(&rbd_dev->lock);
7172        }
7173        spin_unlock(&rbd_dev_list_lock);
7174        if (ret)
7175                return ret;
7176
7177        if (force) {
7178                /*
7179                 * Prevent new IO from being queued and wait for existing
7180                 * IO to complete/fail.
7181                 */
7182                blk_mq_freeze_queue(rbd_dev->disk->queue);
7183                blk_set_queue_dying(rbd_dev->disk->queue);
7184        }
7185
7186        del_gendisk(rbd_dev->disk);
7187        spin_lock(&rbd_dev_list_lock);
7188        list_del_init(&rbd_dev->node);
7189        spin_unlock(&rbd_dev_list_lock);
7190        device_del(&rbd_dev->dev);
7191
7192        rbd_dev_image_unlock(rbd_dev);
7193        rbd_dev_device_release(rbd_dev);
7194        rbd_dev_image_release(rbd_dev);
7195        rbd_dev_destroy(rbd_dev);
7196        return count;
7197}
7198
7199static ssize_t rbd_remove(struct bus_type *bus,
7200                          const char *buf,
7201                          size_t count)
7202{
7203        if (single_major)
7204                return -EINVAL;
7205
7206        return do_rbd_remove(bus, buf, count);
7207}
7208
7209static ssize_t rbd_remove_single_major(struct bus_type *bus,
7210                                       const char *buf,
7211                                       size_t count)
7212{
7213        return do_rbd_remove(bus, buf, count);
7214}
7215
7216/*
7217 * create control files in sysfs
7218 * /sys/bus/rbd/...
7219 */
7220static int __init rbd_sysfs_init(void)
7221{
7222        int ret;
7223
7224        ret = device_register(&rbd_root_dev);
7225        if (ret < 0)
7226                return ret;
7227
7228        ret = bus_register(&rbd_bus_type);
7229        if (ret < 0)
7230                device_unregister(&rbd_root_dev);
7231
7232        return ret;
7233}
7234
7235static void __exit rbd_sysfs_cleanup(void)
7236{
7237        bus_unregister(&rbd_bus_type);
7238        device_unregister(&rbd_root_dev);
7239}
7240
7241static int __init rbd_slab_init(void)
7242{
7243        rbd_assert(!rbd_img_request_cache);
7244        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7245        if (!rbd_img_request_cache)
7246                return -ENOMEM;
7247
7248        rbd_assert(!rbd_obj_request_cache);
7249        rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7250        if (!rbd_obj_request_cache)
7251                goto out_err;
7252
7253        return 0;
7254
7255out_err:
7256        kmem_cache_destroy(rbd_img_request_cache);
7257        rbd_img_request_cache = NULL;
7258        return -ENOMEM;
7259}
7260
7261static void rbd_slab_exit(void)
7262{
7263        rbd_assert(rbd_obj_request_cache);
7264        kmem_cache_destroy(rbd_obj_request_cache);
7265        rbd_obj_request_cache = NULL;
7266
7267        rbd_assert(rbd_img_request_cache);
7268        kmem_cache_destroy(rbd_img_request_cache);
7269        rbd_img_request_cache = NULL;
7270}
7271
7272static int __init rbd_init(void)
7273{
7274        int rc;
7275
7276        if (!libceph_compatible(NULL)) {
7277                rbd_warn(NULL, "libceph incompatibility (quitting)");
7278                return -EINVAL;
7279        }
7280
7281        rc = rbd_slab_init();
7282        if (rc)
7283                return rc;
7284
7285        /*
7286         * The number of active work items is limited by the number of
7287         * rbd devices * queue depth, so leave @max_active at default.
7288         */
7289        rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7290        if (!rbd_wq) {
7291                rc = -ENOMEM;
7292                goto err_out_slab;
7293        }
7294
7295        if (single_major) {
7296                rbd_major = register_blkdev(0, RBD_DRV_NAME);
7297                if (rbd_major < 0) {
7298                        rc = rbd_major;
7299                        goto err_out_wq;
7300                }
7301        }
7302
7303        rc = rbd_sysfs_init();
7304        if (rc)
7305                goto err_out_blkdev;
7306
7307        if (single_major)
7308                pr_info("loaded (major %d)\n", rbd_major);
7309        else
7310                pr_info("loaded\n");
7311
7312        return 0;
7313
7314err_out_blkdev:
7315        if (single_major)
7316                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7317err_out_wq:
7318        destroy_workqueue(rbd_wq);
7319err_out_slab:
7320        rbd_slab_exit();
7321        return rc;
7322}
7323
7324static void __exit rbd_exit(void)
7325{
7326        ida_destroy(&rbd_dev_id_ida);
7327        rbd_sysfs_cleanup();
7328        if (single_major)
7329                unregister_blkdev(rbd_major, RBD_DRV_NAME);
7330        destroy_workqueue(rbd_wq);
7331        rbd_slab_exit();
7332}
7333
7334module_init(rbd_init);
7335module_exit(rbd_exit);
7336
7337MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7338MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7339MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7340/* following authorship retained from original osdblk.c */
7341MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7342
7343MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7344MODULE_LICENSE("GPL");
7345