linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/decode.h>
  36#include <linux/parser.h>
  37#include <linux/bsearch.h>
  38
  39#include <linux/kernel.h>
  40#include <linux/device.h>
  41#include <linux/module.h>
  42#include <linux/blk-mq.h>
  43#include <linux/fs.h>
  44#include <linux/blkdev.h>
  45#include <linux/slab.h>
  46#include <linux/idr.h>
  47#include <linux/workqueue.h>
  48
  49#include "rbd_types.h"
  50
  51#define RBD_DEBUG       /* Activate rbd_assert() calls */
  52
  53/*
  54 * The basic unit of block I/O is a sector.  It is interpreted in a
  55 * number of contexts in Linux (blk, bio, genhd), but the default is
  56 * universally 512 bytes.  These symbols are just slightly more
  57 * meaningful than the bare numbers they represent.
  58 */
  59#define SECTOR_SHIFT    9
  60#define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
  61
  62/*
  63 * Increment the given counter and return its updated value.
  64 * If the counter is already 0 it will not be incremented.
  65 * If the counter is already at its maximum value returns
  66 * -EINVAL without updating it.
  67 */
  68static int atomic_inc_return_safe(atomic_t *v)
  69{
  70        unsigned int counter;
  71
  72        counter = (unsigned int)__atomic_add_unless(v, 1, 0);
  73        if (counter <= (unsigned int)INT_MAX)
  74                return (int)counter;
  75
  76        atomic_dec(v);
  77
  78        return -EINVAL;
  79}
  80
  81/* Decrement the counter.  Return the resulting value, or -EINVAL */
  82static int atomic_dec_return_safe(atomic_t *v)
  83{
  84        int counter;
  85
  86        counter = atomic_dec_return(v);
  87        if (counter >= 0)
  88                return counter;
  89
  90        atomic_inc(v);
  91
  92        return -EINVAL;
  93}
  94
  95#define RBD_DRV_NAME "rbd"
  96
  97#define RBD_MINORS_PER_MAJOR            256
  98#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  99
 100#define RBD_MAX_PARENT_CHAIN_LEN        16
 101
 102#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
 103#define RBD_MAX_SNAP_NAME_LEN   \
 104                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
 105
 106#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
 107
 108#define RBD_SNAP_HEAD_NAME      "-"
 109
 110#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 111
 112/* This allows a single page to hold an image name sent by OSD */
 113#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 114#define RBD_IMAGE_ID_LEN_MAX    64
 115
 116#define RBD_OBJ_PREFIX_LEN_MAX  64
 117
 118#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 119#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 120
 121/* Feature bits */
 122
 123#define RBD_FEATURE_LAYERING    (1<<0)
 124#define RBD_FEATURE_STRIPINGV2  (1<<1)
 125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
 126#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 127                                 RBD_FEATURE_STRIPINGV2 |       \
 128                                 RBD_FEATURE_EXCLUSIVE_LOCK)
 129
 130/* Features supported by this (client software) implementation. */
 131
 132#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 133
 134/*
 135 * An RBD device name will be "rbd#", where the "rbd" comes from
 136 * RBD_DRV_NAME above, and # is a unique integer identifier.
 137 */
 138#define DEV_NAME_LEN            32
 139
 140/*
 141 * block device image metadata (in-memory version)
 142 */
 143struct rbd_image_header {
 144        /* These six fields never change for a given rbd image */
 145        char *object_prefix;
 146        __u8 obj_order;
 147        __u8 crypt_type;
 148        __u8 comp_type;
 149        u64 stripe_unit;
 150        u64 stripe_count;
 151        u64 features;           /* Might be changeable someday? */
 152
 153        /* The remaining fields need to be updated occasionally */
 154        u64 image_size;
 155        struct ceph_snap_context *snapc;
 156        char *snap_names;       /* format 1 only */
 157        u64 *snap_sizes;        /* format 1 only */
 158};
 159
 160/*
 161 * An rbd image specification.
 162 *
 163 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 164 * identify an image.  Each rbd_dev structure includes a pointer to
 165 * an rbd_spec structure that encapsulates this identity.
 166 *
 167 * Each of the id's in an rbd_spec has an associated name.  For a
 168 * user-mapped image, the names are supplied and the id's associated
 169 * with them are looked up.  For a layered image, a parent image is
 170 * defined by the tuple, and the names are looked up.
 171 *
 172 * An rbd_dev structure contains a parent_spec pointer which is
 173 * non-null if the image it represents is a child in a layered
 174 * image.  This pointer will refer to the rbd_spec structure used
 175 * by the parent rbd_dev for its own identity (i.e., the structure
 176 * is shared between the parent and child).
 177 *
 178 * Since these structures are populated once, during the discovery
 179 * phase of image construction, they are effectively immutable so
 180 * we make no effort to synchronize access to them.
 181 *
 182 * Note that code herein does not assume the image name is known (it
 183 * could be a null pointer).
 184 */
 185struct rbd_spec {
 186        u64             pool_id;
 187        const char      *pool_name;
 188
 189        const char      *image_id;
 190        const char      *image_name;
 191
 192        u64             snap_id;
 193        const char      *snap_name;
 194
 195        struct kref     kref;
 196};
 197
 198/*
 199 * an instance of the client.  multiple devices may share an rbd client.
 200 */
 201struct rbd_client {
 202        struct ceph_client      *client;
 203        struct kref             kref;
 204        struct list_head        node;
 205};
 206
 207struct rbd_img_request;
 208typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
 209
 210#define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
 211
 212struct rbd_obj_request;
 213typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 214
 215enum obj_request_type {
 216        OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 217};
 218
 219enum obj_operation_type {
 220        OBJ_OP_WRITE,
 221        OBJ_OP_READ,
 222        OBJ_OP_DISCARD,
 223};
 224
 225enum obj_req_flags {
 226        OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
 227        OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
 228        OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
 229        OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
 230};
 231
 232struct rbd_obj_request {
 233        const char              *object_name;
 234        u64                     offset;         /* object start byte */
 235        u64                     length;         /* bytes from offset */
 236        unsigned long           flags;
 237
 238        /*
 239         * An object request associated with an image will have its
 240         * img_data flag set; a standalone object request will not.
 241         *
 242         * A standalone object request will have which == BAD_WHICH
 243         * and a null obj_request pointer.
 244         *
 245         * An object request initiated in support of a layered image
 246         * object (to check for its existence before a write) will
 247         * have which == BAD_WHICH and a non-null obj_request pointer.
 248         *
 249         * Finally, an object request for rbd image data will have
 250         * which != BAD_WHICH, and will have a non-null img_request
 251         * pointer.  The value of which will be in the range
 252         * 0..(img_request->obj_request_count-1).
 253         */
 254        union {
 255                struct rbd_obj_request  *obj_request;   /* STAT op */
 256                struct {
 257                        struct rbd_img_request  *img_request;
 258                        u64                     img_offset;
 259                        /* links for img_request->obj_requests list */
 260                        struct list_head        links;
 261                };
 262        };
 263        u32                     which;          /* posn image request list */
 264
 265        enum obj_request_type   type;
 266        union {
 267                struct bio      *bio_list;
 268                struct {
 269                        struct page     **pages;
 270                        u32             page_count;
 271                };
 272        };
 273        struct page             **copyup_pages;
 274        u32                     copyup_page_count;
 275
 276        struct ceph_osd_request *osd_req;
 277
 278        u64                     xferred;        /* bytes transferred */
 279        int                     result;
 280
 281        rbd_obj_callback_t      callback;
 282        struct completion       completion;
 283
 284        struct kref             kref;
 285};
 286
 287enum img_req_flags {
 288        IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
 289        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 290        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 291        IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
 292};
 293
 294struct rbd_img_request {
 295        struct rbd_device       *rbd_dev;
 296        u64                     offset; /* starting image byte offset */
 297        u64                     length; /* byte count from offset */
 298        unsigned long           flags;
 299        union {
 300                u64                     snap_id;        /* for reads */
 301                struct ceph_snap_context *snapc;        /* for writes */
 302        };
 303        union {
 304                struct request          *rq;            /* block request */
 305                struct rbd_obj_request  *obj_request;   /* obj req initiator */
 306        };
 307        struct page             **copyup_pages;
 308        u32                     copyup_page_count;
 309        spinlock_t              completion_lock;/* protects next_completion */
 310        u32                     next_completion;
 311        rbd_img_callback_t      callback;
 312        u64                     xferred;/* aggregate bytes transferred */
 313        int                     result; /* first nonzero obj_request result */
 314
 315        u32                     obj_request_count;
 316        struct list_head        obj_requests;   /* rbd_obj_request structs */
 317
 318        struct kref             kref;
 319};
 320
 321#define for_each_obj_request(ireq, oreq) \
 322        list_for_each_entry(oreq, &(ireq)->obj_requests, links)
 323#define for_each_obj_request_from(ireq, oreq) \
 324        list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
 325#define for_each_obj_request_safe(ireq, oreq, n) \
 326        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 327
 328enum rbd_watch_state {
 329        RBD_WATCH_STATE_UNREGISTERED,
 330        RBD_WATCH_STATE_REGISTERED,
 331        RBD_WATCH_STATE_ERROR,
 332};
 333
 334enum rbd_lock_state {
 335        RBD_LOCK_STATE_UNLOCKED,
 336        RBD_LOCK_STATE_LOCKED,
 337        RBD_LOCK_STATE_RELEASING,
 338};
 339
 340/* WatchNotify::ClientId */
 341struct rbd_client_id {
 342        u64 gid;
 343        u64 handle;
 344};
 345
 346struct rbd_mapping {
 347        u64                     size;
 348        u64                     features;
 349        bool                    read_only;
 350};
 351
 352/*
 353 * a single device
 354 */
 355struct rbd_device {
 356        int                     dev_id;         /* blkdev unique id */
 357
 358        int                     major;          /* blkdev assigned major */
 359        int                     minor;
 360        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 361
 362        u32                     image_format;   /* Either 1 or 2 */
 363        struct rbd_client       *rbd_client;
 364
 365        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 366
 367        spinlock_t              lock;           /* queue, flags, open_count */
 368
 369        struct rbd_image_header header;
 370        unsigned long           flags;          /* possibly lock protected */
 371        struct rbd_spec         *spec;
 372        struct rbd_options      *opts;
 373        char                    *config_info;   /* add{,_single_major} string */
 374
 375        struct ceph_object_id   header_oid;
 376        struct ceph_object_locator header_oloc;
 377
 378        struct ceph_file_layout layout;         /* used for all rbd requests */
 379
 380        struct mutex            watch_mutex;
 381        enum rbd_watch_state    watch_state;
 382        struct ceph_osd_linger_request *watch_handle;
 383        u64                     watch_cookie;
 384        struct delayed_work     watch_dwork;
 385
 386        struct rw_semaphore     lock_rwsem;
 387        enum rbd_lock_state     lock_state;
 388        struct rbd_client_id    owner_cid;
 389        struct work_struct      acquired_lock_work;
 390        struct work_struct      released_lock_work;
 391        struct delayed_work     lock_dwork;
 392        struct work_struct      unlock_work;
 393        wait_queue_head_t       lock_waitq;
 394
 395        struct workqueue_struct *task_wq;
 396
 397        struct rbd_spec         *parent_spec;
 398        u64                     parent_overlap;
 399        atomic_t                parent_ref;
 400        struct rbd_device       *parent;
 401
 402        /* Block layer tags. */
 403        struct blk_mq_tag_set   tag_set;
 404
 405        /* protects updating the header */
 406        struct rw_semaphore     header_rwsem;
 407
 408        struct rbd_mapping      mapping;
 409
 410        struct list_head        node;
 411
 412        /* sysfs related */
 413        struct device           dev;
 414        unsigned long           open_count;     /* protected by lock */
 415};
 416
 417/*
 418 * Flag bits for rbd_dev->flags:
 419 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 420 *   by rbd_dev->lock
 421 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
 422 */
 423enum rbd_dev_flags {
 424        RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
 425        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 426        RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
 427};
 428
 429static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 430
 431static LIST_HEAD(rbd_dev_list);    /* devices */
 432static DEFINE_SPINLOCK(rbd_dev_list_lock);
 433
 434static LIST_HEAD(rbd_client_list);              /* clients */
 435static DEFINE_SPINLOCK(rbd_client_list_lock);
 436
 437/* Slab caches for frequently-allocated structures */
 438
 439static struct kmem_cache        *rbd_img_request_cache;
 440static struct kmem_cache        *rbd_obj_request_cache;
 441static struct kmem_cache        *rbd_segment_name_cache;
 442
 443static int rbd_major;
 444static DEFINE_IDA(rbd_dev_id_ida);
 445
 446static struct workqueue_struct *rbd_wq;
 447
 448/*
 449 * Default to false for now, as single-major requires >= 0.75 version of
 450 * userspace rbd utility.
 451 */
 452static bool single_major = false;
 453module_param(single_major, bool, S_IRUGO);
 454MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
 455
 456static int rbd_img_request_submit(struct rbd_img_request *img_request);
 457
 458static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 459                       size_t count);
 460static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 461                          size_t count);
 462static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
 463                                    size_t count);
 464static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 465                                       size_t count);
 466static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 467static void rbd_spec_put(struct rbd_spec *spec);
 468
 469static struct bus_attribute rbd_bus_attrs[] = {
 470        __ATTR(add, S_IWUSR, NULL, rbd_add),
 471        __ATTR(remove, S_IWUSR, NULL, rbd_remove),
 472        __ATTR_NULL
 473};
 474
 475static struct bus_attribute rbd_bus_attrs_single_major[] = {
 476        __ATTR(add, S_IWUSR, NULL, rbd_add),
 477        __ATTR(remove, S_IWUSR, NULL, rbd_remove),
 478        __ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major),
 479        __ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major),
 480        __ATTR_NULL
 481};
 482
 483static int rbd_dev_id_to_minor(int dev_id)
 484{
 485        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 486}
 487
 488static int minor_to_rbd_dev_id(int minor)
 489{
 490        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 491}
 492
 493static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
 494{
 495        return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
 496               rbd_dev->spec->snap_id == CEPH_NOSNAP &&
 497               !rbd_dev->mapping.read_only;
 498}
 499
 500static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 501{
 502        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 503               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 504}
 505
 506static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 507{
 508        bool is_lock_owner;
 509
 510        down_read(&rbd_dev->lock_rwsem);
 511        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 512        up_read(&rbd_dev->lock_rwsem);
 513        return is_lock_owner;
 514}
 515
 516static struct bus_type rbd_bus_type = {
 517        .name           = "rbd",
 518        .bus_attrs      = rbd_bus_attrs,
 519};
 520
 521static void rbd_root_dev_release(struct device *dev)
 522{
 523}
 524
 525static struct device rbd_root_dev = {
 526        .init_name =    "rbd",
 527        .release =      rbd_root_dev_release,
 528};
 529
 530static __printf(2, 3)
 531void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 532{
 533        struct va_format vaf;
 534        va_list args;
 535
 536        va_start(args, fmt);
 537        vaf.fmt = fmt;
 538        vaf.va = &args;
 539
 540        if (!rbd_dev)
 541                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 542        else if (rbd_dev->disk)
 543                printk(KERN_WARNING "%s: %s: %pV\n",
 544                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 545        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 546                printk(KERN_WARNING "%s: image %s: %pV\n",
 547                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 548        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 549                printk(KERN_WARNING "%s: id %s: %pV\n",
 550                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 551        else    /* punt */
 552                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 553                        RBD_DRV_NAME, rbd_dev, &vaf);
 554        va_end(args);
 555}
 556
 557#ifdef RBD_DEBUG
 558#define rbd_assert(expr)                                                \
 559                if (unlikely(!(expr))) {                                \
 560                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 561                                                "at line %d:\n\n"       \
 562                                        "\trbd_assert(%s);\n\n",        \
 563                                        __func__, __LINE__, #expr);     \
 564                        BUG();                                          \
 565                }
 566#else /* !RBD_DEBUG */
 567#  define rbd_assert(expr)      ((void) 0)
 568#endif /* !RBD_DEBUG */
 569
 570static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
 571static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
 572static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 573static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 574
 575static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 576static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 577static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 578static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 579static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 580                                        u64 snap_id);
 581static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 582                                u8 *order, u64 *snap_size);
 583static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 584                u64 *snap_features);
 585
 586static int rbd_open(struct block_device *bdev, fmode_t mode)
 587{
 588        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 589        bool removing = false;
 590
 591        if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
 592                return -EROFS;
 593
 594        spin_lock_irq(&rbd_dev->lock);
 595        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 596                removing = true;
 597        else
 598                rbd_dev->open_count++;
 599        spin_unlock_irq(&rbd_dev->lock);
 600        if (removing)
 601                return -ENOENT;
 602
 603        (void) get_device(&rbd_dev->dev);
 604
 605        return 0;
 606}
 607
 608static void rbd_release(struct gendisk *disk, fmode_t mode)
 609{
 610        struct rbd_device *rbd_dev = disk->private_data;
 611        unsigned long open_count_before;
 612
 613        spin_lock_irq(&rbd_dev->lock);
 614        open_count_before = rbd_dev->open_count--;
 615        spin_unlock_irq(&rbd_dev->lock);
 616        rbd_assert(open_count_before > 0);
 617
 618        put_device(&rbd_dev->dev);
 619}
 620
 621static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
 622{
 623        int ret = 0;
 624        int val;
 625        bool ro;
 626        bool ro_changed = false;
 627
 628        /* get_user() may sleep, so call it before taking rbd_dev->lock */
 629        if (get_user(val, (int __user *)(arg)))
 630                return -EFAULT;
 631
 632        ro = val ? true : false;
 633        /* Snapshot doesn't allow to write*/
 634        if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
 635                return -EROFS;
 636
 637        spin_lock_irq(&rbd_dev->lock);
 638        /* prevent others open this device */
 639        if (rbd_dev->open_count > 1) {
 640                ret = -EBUSY;
 641                goto out;
 642        }
 643
 644        if (rbd_dev->mapping.read_only != ro) {
 645                rbd_dev->mapping.read_only = ro;
 646                ro_changed = true;
 647        }
 648
 649out:
 650        spin_unlock_irq(&rbd_dev->lock);
 651        /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
 652        if (ret == 0 && ro_changed)
 653                set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
 654
 655        return ret;
 656}
 657
 658static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
 659                        unsigned int cmd, unsigned long arg)
 660{
 661        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 662        int ret = 0;
 663
 664        switch (cmd) {
 665        case BLKROSET:
 666                ret = rbd_ioctl_set_ro(rbd_dev, arg);
 667                break;
 668        default:
 669                ret = -ENOTTY;
 670        }
 671
 672        return ret;
 673}
 674
 675#ifdef CONFIG_COMPAT
 676static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
 677                                unsigned int cmd, unsigned long arg)
 678{
 679        return rbd_ioctl(bdev, mode, cmd, arg);
 680}
 681#endif /* CONFIG_COMPAT */
 682
 683static const struct block_device_operations rbd_bd_ops = {
 684        .owner                  = THIS_MODULE,
 685        .open                   = rbd_open,
 686        .release                = rbd_release,
 687        .ioctl                  = rbd_ioctl,
 688#ifdef CONFIG_COMPAT
 689        .compat_ioctl           = rbd_compat_ioctl,
 690#endif
 691};
 692
 693/*
 694 * Initialize an rbd client instance.  Success or not, this function
 695 * consumes ceph_opts.  Caller holds client_mutex.
 696 */
 697static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 698{
 699        struct rbd_client *rbdc;
 700        int ret = -ENOMEM;
 701
 702        dout("%s:\n", __func__);
 703        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 704        if (!rbdc)
 705                goto out_opt;
 706
 707        kref_init(&rbdc->kref);
 708        INIT_LIST_HEAD(&rbdc->node);
 709
 710        rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
 711        if (IS_ERR(rbdc->client))
 712                goto out_rbdc;
 713        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 714
 715        ret = ceph_open_session(rbdc->client);
 716        if (ret < 0)
 717                goto out_client;
 718
 719        spin_lock(&rbd_client_list_lock);
 720        list_add_tail(&rbdc->node, &rbd_client_list);
 721        spin_unlock(&rbd_client_list_lock);
 722
 723        dout("%s: rbdc %p\n", __func__, rbdc);
 724
 725        return rbdc;
 726out_client:
 727        ceph_destroy_client(rbdc->client);
 728out_rbdc:
 729        kfree(rbdc);
 730out_opt:
 731        if (ceph_opts)
 732                ceph_destroy_options(ceph_opts);
 733        dout("%s: error %d\n", __func__, ret);
 734
 735        return ERR_PTR(ret);
 736}
 737
 738static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 739{
 740        kref_get(&rbdc->kref);
 741
 742        return rbdc;
 743}
 744
 745/*
 746 * Find a ceph client with specific addr and configuration.  If
 747 * found, bump its reference count.
 748 */
 749static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 750{
 751        struct rbd_client *client_node;
 752        bool found = false;
 753
 754        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 755                return NULL;
 756
 757        spin_lock(&rbd_client_list_lock);
 758        list_for_each_entry(client_node, &rbd_client_list, node) {
 759                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 760                        __rbd_get_client(client_node);
 761
 762                        found = true;
 763                        break;
 764                }
 765        }
 766        spin_unlock(&rbd_client_list_lock);
 767
 768        return found ? client_node : NULL;
 769}
 770
 771/*
 772 * (Per device) rbd map options
 773 */
 774enum {
 775        Opt_queue_depth,
 776        Opt_last_int,
 777        /* int args above */
 778        Opt_last_string,
 779        /* string args above */
 780        Opt_read_only,
 781        Opt_read_write,
 782        Opt_lock_on_read,
 783        Opt_err
 784};
 785
 786static match_table_t rbd_opts_tokens = {
 787        {Opt_queue_depth, "queue_depth=%d"},
 788        /* int args above */
 789        /* string args above */
 790        {Opt_read_only, "read_only"},
 791        {Opt_read_only, "ro"},          /* Alternate spelling */
 792        {Opt_read_write, "read_write"},
 793        {Opt_read_write, "rw"},         /* Alternate spelling */
 794        {Opt_lock_on_read, "lock_on_read"},
 795        {Opt_err, NULL}
 796};
 797
 798struct rbd_options {
 799        int     queue_depth;
 800        bool    read_only;
 801        bool    lock_on_read;
 802};
 803
 804#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 805#define RBD_READ_ONLY_DEFAULT   false
 806#define RBD_LOCK_ON_READ_DEFAULT false
 807
 808static int parse_rbd_opts_token(char *c, void *private)
 809{
 810        struct rbd_options *rbd_opts = private;
 811        substring_t argstr[MAX_OPT_ARGS];
 812        int token, intval, ret;
 813
 814        token = match_token(c, rbd_opts_tokens, argstr);
 815        if (token < Opt_last_int) {
 816                ret = match_int(&argstr[0], &intval);
 817                if (ret < 0) {
 818                        pr_err("bad mount option arg (not int) at '%s'\n", c);
 819                        return ret;
 820                }
 821                dout("got int token %d val %d\n", token, intval);
 822        } else if (token > Opt_last_int && token < Opt_last_string) {
 823                dout("got string token %d val %s\n", token, argstr[0].from);
 824        } else {
 825                dout("got token %d\n", token);
 826        }
 827
 828        switch (token) {
 829        case Opt_queue_depth:
 830                if (intval < 1) {
 831                        pr_err("queue_depth out of range\n");
 832                        return -EINVAL;
 833                }
 834                rbd_opts->queue_depth = intval;
 835                break;
 836        case Opt_read_only:
 837                rbd_opts->read_only = true;
 838                break;
 839        case Opt_read_write:
 840                rbd_opts->read_only = false;
 841                break;
 842        case Opt_lock_on_read:
 843                rbd_opts->lock_on_read = true;
 844                break;
 845        default:
 846                /* libceph prints "bad option" msg */
 847                return -EINVAL;
 848        }
 849
 850        return 0;
 851}
 852
 853static char* obj_op_name(enum obj_operation_type op_type)
 854{
 855        switch (op_type) {
 856        case OBJ_OP_READ:
 857                return "read";
 858        case OBJ_OP_WRITE:
 859                return "write";
 860        case OBJ_OP_DISCARD:
 861                return "discard";
 862        default:
 863                return "???";
 864        }
 865}
 866
 867/*
 868 * Get a ceph client with specific addr and configuration, if one does
 869 * not exist create it.  Either way, ceph_opts is consumed by this
 870 * function.
 871 */
 872static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 873{
 874        struct rbd_client *rbdc;
 875
 876        mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
 877        rbdc = rbd_client_find(ceph_opts);
 878        if (rbdc)       /* using an existing client */
 879                ceph_destroy_options(ceph_opts);
 880        else
 881                rbdc = rbd_client_create(ceph_opts);
 882        mutex_unlock(&client_mutex);
 883
 884        return rbdc;
 885}
 886
 887/*
 888 * Destroy ceph client
 889 *
 890 * Caller must hold rbd_client_list_lock.
 891 */
 892static void rbd_client_release(struct kref *kref)
 893{
 894        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 895
 896        dout("%s: rbdc %p\n", __func__, rbdc);
 897        spin_lock(&rbd_client_list_lock);
 898        list_del(&rbdc->node);
 899        spin_unlock(&rbd_client_list_lock);
 900
 901        ceph_destroy_client(rbdc->client);
 902        kfree(rbdc);
 903}
 904
 905/*
 906 * Drop reference to ceph client node. If it's not referenced anymore, release
 907 * it.
 908 */
 909static void rbd_put_client(struct rbd_client *rbdc)
 910{
 911        if (rbdc)
 912                kref_put(&rbdc->kref, rbd_client_release);
 913}
 914
 915static bool rbd_image_format_valid(u32 image_format)
 916{
 917        return image_format == 1 || image_format == 2;
 918}
 919
 920static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 921{
 922        size_t size;
 923        u32 snap_count;
 924
 925        /* The header has to start with the magic rbd header text */
 926        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 927                return false;
 928
 929        /* The bio layer requires at least sector-sized I/O */
 930
 931        if (ondisk->options.order < SECTOR_SHIFT)
 932                return false;
 933
 934        /* If we use u64 in a few spots we may be able to loosen this */
 935
 936        if (ondisk->options.order > 8 * sizeof (int) - 1)
 937                return false;
 938
 939        /*
 940         * The size of a snapshot header has to fit in a size_t, and
 941         * that limits the number of snapshots.
 942         */
 943        snap_count = le32_to_cpu(ondisk->snap_count);
 944        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 945        if (snap_count > size / sizeof (__le64))
 946                return false;
 947
 948        /*
 949         * Not only that, but the size of the entire the snapshot
 950         * header must also be representable in a size_t.
 951         */
 952        size -= snap_count * sizeof (__le64);
 953        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 954                return false;
 955
 956        return true;
 957}
 958
 959/*
 960 * Fill an rbd image header with information from the given format 1
 961 * on-disk header.
 962 */
 963static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 964                                 struct rbd_image_header_ondisk *ondisk)
 965{
 966        struct rbd_image_header *header = &rbd_dev->header;
 967        bool first_time = header->object_prefix == NULL;
 968        struct ceph_snap_context *snapc;
 969        char *object_prefix = NULL;
 970        char *snap_names = NULL;
 971        u64 *snap_sizes = NULL;
 972        u32 snap_count;
 973        size_t size;
 974        int ret = -ENOMEM;
 975        u32 i;
 976
 977        /* Allocate this now to avoid having to handle failure below */
 978
 979        if (first_time) {
 980                size_t len;
 981
 982                len = strnlen(ondisk->object_prefix,
 983                                sizeof (ondisk->object_prefix));
 984                object_prefix = kmalloc(len + 1, GFP_KERNEL);
 985                if (!object_prefix)
 986                        return -ENOMEM;
 987                memcpy(object_prefix, ondisk->object_prefix, len);
 988                object_prefix[len] = '\0';
 989        }
 990
 991        /* Allocate the snapshot context and fill it in */
 992
 993        snap_count = le32_to_cpu(ondisk->snap_count);
 994        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
 995        if (!snapc)
 996                goto out_err;
 997        snapc->seq = le64_to_cpu(ondisk->snap_seq);
 998        if (snap_count) {
 999                struct rbd_image_snap_ondisk *snaps;
1000                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1001
1002                /* We'll keep a copy of the snapshot names... */
1003
1004                if (snap_names_len > (u64)SIZE_MAX)
1005                        goto out_2big;
1006                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1007                if (!snap_names)
1008                        goto out_err;
1009
1010                /* ...as well as the array of their sizes. */
1011
1012                size = snap_count * sizeof (*header->snap_sizes);
1013                snap_sizes = kmalloc(size, GFP_KERNEL);
1014                if (!snap_sizes)
1015                        goto out_err;
1016
1017                /*
1018                 * Copy the names, and fill in each snapshot's id
1019                 * and size.
1020                 *
1021                 * Note that rbd_dev_v1_header_info() guarantees the
1022                 * ondisk buffer we're working with has
1023                 * snap_names_len bytes beyond the end of the
1024                 * snapshot id array, this memcpy() is safe.
1025                 */
1026                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1027                snaps = ondisk->snaps;
1028                for (i = 0; i < snap_count; i++) {
1029                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1030                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1031                }
1032        }
1033
1034        /* We won't fail any more, fill in the header */
1035
1036        if (first_time) {
1037                header->object_prefix = object_prefix;
1038                header->obj_order = ondisk->options.order;
1039                header->crypt_type = ondisk->options.crypt_type;
1040                header->comp_type = ondisk->options.comp_type;
1041                /* The rest aren't used for format 1 images */
1042                header->stripe_unit = 0;
1043                header->stripe_count = 0;
1044                header->features = 0;
1045        } else {
1046                ceph_put_snap_context(header->snapc);
1047                kfree(header->snap_names);
1048                kfree(header->snap_sizes);
1049        }
1050
1051        /* The remaining fields always get updated (when we refresh) */
1052
1053        header->image_size = le64_to_cpu(ondisk->image_size);
1054        header->snapc = snapc;
1055        header->snap_names = snap_names;
1056        header->snap_sizes = snap_sizes;
1057
1058        return 0;
1059out_2big:
1060        ret = -EIO;
1061out_err:
1062        kfree(snap_sizes);
1063        kfree(snap_names);
1064        ceph_put_snap_context(snapc);
1065        kfree(object_prefix);
1066
1067        return ret;
1068}
1069
1070static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1071{
1072        const char *snap_name;
1073
1074        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1075
1076        /* Skip over names until we find the one we are looking for */
1077
1078        snap_name = rbd_dev->header.snap_names;
1079        while (which--)
1080                snap_name += strlen(snap_name) + 1;
1081
1082        return kstrdup(snap_name, GFP_KERNEL);
1083}
1084
1085/*
1086 * Snapshot id comparison function for use with qsort()/bsearch().
1087 * Note that result is for snapshots in *descending* order.
1088 */
1089static int snapid_compare_reverse(const void *s1, const void *s2)
1090{
1091        u64 snap_id1 = *(u64 *)s1;
1092        u64 snap_id2 = *(u64 *)s2;
1093
1094        if (snap_id1 < snap_id2)
1095                return 1;
1096        return snap_id1 == snap_id2 ? 0 : -1;
1097}
1098
1099/*
1100 * Search a snapshot context to see if the given snapshot id is
1101 * present.
1102 *
1103 * Returns the position of the snapshot id in the array if it's found,
1104 * or BAD_SNAP_INDEX otherwise.
1105 *
1106 * Note: The snapshot array is in kept sorted (by the osd) in
1107 * reverse order, highest snapshot id first.
1108 */
1109static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1110{
1111        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1112        u64 *found;
1113
1114        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1115                                sizeof (snap_id), snapid_compare_reverse);
1116
1117        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1118}
1119
1120static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1121                                        u64 snap_id)
1122{
1123        u32 which;
1124        const char *snap_name;
1125
1126        which = rbd_dev_snap_index(rbd_dev, snap_id);
1127        if (which == BAD_SNAP_INDEX)
1128                return ERR_PTR(-ENOENT);
1129
1130        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1131        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1132}
1133
1134static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1135{
1136        if (snap_id == CEPH_NOSNAP)
1137                return RBD_SNAP_HEAD_NAME;
1138
1139        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1140        if (rbd_dev->image_format == 1)
1141                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1142
1143        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1144}
1145
1146static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1147                                u64 *snap_size)
1148{
1149        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1150        if (snap_id == CEPH_NOSNAP) {
1151                *snap_size = rbd_dev->header.image_size;
1152        } else if (rbd_dev->image_format == 1) {
1153                u32 which;
1154
1155                which = rbd_dev_snap_index(rbd_dev, snap_id);
1156                if (which == BAD_SNAP_INDEX)
1157                        return -ENOENT;
1158
1159                *snap_size = rbd_dev->header.snap_sizes[which];
1160        } else {
1161                u64 size = 0;
1162                int ret;
1163
1164                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1165                if (ret)
1166                        return ret;
1167
1168                *snap_size = size;
1169        }
1170        return 0;
1171}
1172
1173static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1174                        u64 *snap_features)
1175{
1176        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1177        if (snap_id == CEPH_NOSNAP) {
1178                *snap_features = rbd_dev->header.features;
1179        } else if (rbd_dev->image_format == 1) {
1180                *snap_features = 0;     /* No features for format 1 */
1181        } else {
1182                u64 features = 0;
1183                int ret;
1184
1185                ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1186                if (ret)
1187                        return ret;
1188
1189                *snap_features = features;
1190        }
1191        return 0;
1192}
1193
1194static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1195{
1196        u64 snap_id = rbd_dev->spec->snap_id;
1197        u64 size = 0;
1198        u64 features = 0;
1199        int ret;
1200
1201        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1202        if (ret)
1203                return ret;
1204        ret = rbd_snap_features(rbd_dev, snap_id, &features);
1205        if (ret)
1206                return ret;
1207
1208        rbd_dev->mapping.size = size;
1209        rbd_dev->mapping.features = features;
1210
1211        return 0;
1212}
1213
1214static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1215{
1216        rbd_dev->mapping.size = 0;
1217        rbd_dev->mapping.features = 0;
1218}
1219
1220static void rbd_segment_name_free(const char *name)
1221{
1222        /* The explicit cast here is needed to drop the const qualifier */
1223
1224        kmem_cache_free(rbd_segment_name_cache, (void *)name);
1225}
1226
1227static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1228{
1229        char *name;
1230        u64 segment;
1231        int ret;
1232        char *name_format;
1233
1234        name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1235        if (!name)
1236                return NULL;
1237        segment = offset >> rbd_dev->header.obj_order;
1238        name_format = "%s.%012llx";
1239        if (rbd_dev->image_format == 2)
1240                name_format = "%s.%016llx";
1241        ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1242                        rbd_dev->header.object_prefix, segment);
1243        if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1244                pr_err("error formatting segment name for #%llu (%d)\n",
1245                        segment, ret);
1246                rbd_segment_name_free(name);
1247                name = NULL;
1248        }
1249
1250        return name;
1251}
1252
1253static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1254{
1255        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1256
1257        return offset & (segment_size - 1);
1258}
1259
1260static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1261                                u64 offset, u64 length)
1262{
1263        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1264
1265        offset &= segment_size - 1;
1266
1267        rbd_assert(length <= U64_MAX - offset);
1268        if (offset + length > segment_size)
1269                length = segment_size - offset;
1270
1271        return length;
1272}
1273
1274/*
1275 * returns the size of an object in the image
1276 */
1277static u64 rbd_obj_bytes(struct rbd_image_header *header)
1278{
1279        return 1 << header->obj_order;
1280}
1281
1282/*
1283 * bio helpers
1284 */
1285
1286static void bio_chain_put(struct bio *chain)
1287{
1288        struct bio *tmp;
1289
1290        while (chain) {
1291                tmp = chain;
1292                chain = chain->bi_next;
1293                bio_put(tmp);
1294        }
1295}
1296
1297/*
1298 * zeros a bio chain, starting at specific offset
1299 */
1300static void zero_bio_chain(struct bio *chain, int start_ofs)
1301{
1302        struct bio_vec *bv;
1303        unsigned long flags;
1304        void *buf;
1305        int i;
1306        int pos = 0;
1307
1308        while (chain) {
1309                bio_for_each_segment(bv, chain, i) {
1310                        if (pos + bv->bv_len > start_ofs) {
1311                                int remainder = max(start_ofs - pos, 0);
1312                                buf = bvec_kmap_irq(bv, &flags);
1313                                memset(buf + remainder, 0,
1314                                       bv->bv_len - remainder);
1315                                flush_dcache_page(bv->bv_page);
1316                                bvec_kunmap_irq(buf, &flags);
1317                        }
1318                        pos += bv->bv_len;
1319                }
1320
1321                chain = chain->bi_next;
1322        }
1323}
1324
1325/*
1326 * similar to zero_bio_chain(), zeros data defined by a page array,
1327 * starting at the given byte offset from the start of the array and
1328 * continuing up to the given end offset.  The pages array is
1329 * assumed to be big enough to hold all bytes up to the end.
1330 */
1331static void zero_pages(struct page **pages, u64 offset, u64 end)
1332{
1333        struct page **page = &pages[offset >> PAGE_SHIFT];
1334
1335        rbd_assert(end > offset);
1336        rbd_assert(end - offset <= (u64)SIZE_MAX);
1337        while (offset < end) {
1338                size_t page_offset;
1339                size_t length;
1340                unsigned long flags;
1341                void *kaddr;
1342
1343                page_offset = offset & ~PAGE_MASK;
1344                length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345                local_irq_save(flags);
1346                kaddr = kmap_atomic(*page);
1347                memset(kaddr + page_offset, 0, length);
1348                flush_dcache_page(*page);
1349                kunmap_atomic(kaddr);
1350                local_irq_restore(flags);
1351
1352                offset += length;
1353                page++;
1354        }
1355}
1356
1357/*
1358 * Clone a portion of a bio, starting at the given byte offset
1359 * and continuing for the number of bytes indicated.
1360 */
1361static struct bio *bio_clone_range(struct bio *bio_src,
1362                                        unsigned int offset,
1363                                        unsigned int len,
1364                                        gfp_t gfpmask)
1365{
1366        struct bio_vec *bv;
1367        unsigned int resid;
1368        unsigned short idx;
1369        unsigned int voff;
1370        unsigned short end_idx;
1371        unsigned short vcnt;
1372        struct bio *bio;
1373
1374        /* Handle the easy case for the caller */
1375
1376        if (!offset && len == bio_src->bi_size)
1377                return bio_clone(bio_src, gfpmask);
1378
1379        if (WARN_ON_ONCE(!len))
1380                return NULL;
1381        if (WARN_ON_ONCE(len > bio_src->bi_size))
1382                return NULL;
1383        if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1384                return NULL;
1385
1386        /* Find first affected segment... */
1387
1388        resid = offset;
1389        bio_for_each_segment(bv, bio_src, idx) {
1390                if (resid < bv->bv_len)
1391                        break;
1392                resid -= bv->bv_len;
1393        }
1394        voff = resid;
1395
1396        /* ...and the last affected segment */
1397
1398        resid += len;
1399        __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1400                if (resid <= bv->bv_len)
1401                        break;
1402                resid -= bv->bv_len;
1403        }
1404        vcnt = end_idx - idx + 1;
1405
1406        /* Build the clone */
1407
1408        bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1409        if (!bio)
1410                return NULL;    /* ENOMEM */
1411
1412        bio->bi_bdev = bio_src->bi_bdev;
1413        bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1414        bio->bi_rw = bio_src->bi_rw;
1415        bio->bi_flags |= 1 << BIO_CLONED;
1416
1417        /*
1418         * Copy over our part of the bio_vec, then update the first
1419         * and last (or only) entries.
1420         */
1421        memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1422                        vcnt * sizeof (struct bio_vec));
1423        bio->bi_io_vec[0].bv_offset += voff;
1424        if (vcnt > 1) {
1425                bio->bi_io_vec[0].bv_len -= voff;
1426                bio->bi_io_vec[vcnt - 1].bv_len = resid;
1427        } else {
1428                bio->bi_io_vec[0].bv_len = len;
1429        }
1430
1431        bio->bi_vcnt = vcnt;
1432        bio->bi_size = len;
1433        bio->bi_idx = 0;
1434
1435        return bio;
1436}
1437
1438/*
1439 * Clone a portion of a bio chain, starting at the given byte offset
1440 * into the first bio in the source chain and continuing for the
1441 * number of bytes indicated.  The result is another bio chain of
1442 * exactly the given length, or a null pointer on error.
1443 *
1444 * The bio_src and offset parameters are both in-out.  On entry they
1445 * refer to the first source bio and the offset into that bio where
1446 * the start of data to be cloned is located.
1447 *
1448 * On return, bio_src is updated to refer to the bio in the source
1449 * chain that contains first un-cloned byte, and *offset will
1450 * contain the offset of that byte within that bio.
1451 */
1452static struct bio *bio_chain_clone_range(struct bio **bio_src,
1453                                        unsigned int *offset,
1454                                        unsigned int len,
1455                                        gfp_t gfpmask)
1456{
1457        struct bio *bi = *bio_src;
1458        unsigned int off = *offset;
1459        struct bio *chain = NULL;
1460        struct bio **end;
1461
1462        /* Build up a chain of clone bios up to the limit */
1463
1464        if (!bi || off >= bi->bi_size || !len)
1465                return NULL;            /* Nothing to clone */
1466
1467        end = &chain;
1468        while (len) {
1469                unsigned int bi_size;
1470                struct bio *bio;
1471
1472                if (!bi) {
1473                        rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1474                        goto out_err;   /* EINVAL; ran out of bio's */
1475                }
1476                bi_size = min_t(unsigned int, bi->bi_size - off, len);
1477                bio = bio_clone_range(bi, off, bi_size, gfpmask);
1478                if (!bio)
1479                        goto out_err;   /* ENOMEM */
1480
1481                *end = bio;
1482                end = &bio->bi_next;
1483
1484                off += bi_size;
1485                if (off == bi->bi_size) {
1486                        bi = bi->bi_next;
1487                        off = 0;
1488                }
1489                len -= bi_size;
1490        }
1491        *bio_src = bi;
1492        *offset = off;
1493
1494        return chain;
1495out_err:
1496        bio_chain_put(chain);
1497
1498        return NULL;
1499}
1500
1501/*
1502 * The default/initial value for all object request flags is 0.  For
1503 * each flag, once its value is set to 1 it is never reset to 0
1504 * again.
1505 */
1506static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1507{
1508        if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1509                struct rbd_device *rbd_dev;
1510
1511                rbd_dev = obj_request->img_request->rbd_dev;
1512                rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1513                        obj_request);
1514        }
1515}
1516
1517static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1518{
1519        smp_mb();
1520        return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1521}
1522
1523static void obj_request_done_set(struct rbd_obj_request *obj_request)
1524{
1525        if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1526                struct rbd_device *rbd_dev = NULL;
1527
1528                if (obj_request_img_data_test(obj_request))
1529                        rbd_dev = obj_request->img_request->rbd_dev;
1530                rbd_warn(rbd_dev, "obj_request %p already marked done",
1531                        obj_request);
1532        }
1533}
1534
1535static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1536{
1537        smp_mb();
1538        return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1539}
1540
1541/*
1542 * This sets the KNOWN flag after (possibly) setting the EXISTS
1543 * flag.  The latter is set based on the "exists" value provided.
1544 *
1545 * Note that for our purposes once an object exists it never goes
1546 * away again.  It's possible that the response from two existence
1547 * checks are separated by the creation of the target object, and
1548 * the first ("doesn't exist") response arrives *after* the second
1549 * ("does exist").  In that case we ignore the second one.
1550 */
1551static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1552                                bool exists)
1553{
1554        if (exists)
1555                set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1556        set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1557        smp_mb();
1558}
1559
1560static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1561{
1562        smp_mb();
1563        return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1564}
1565
1566static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1567{
1568        smp_mb();
1569        return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1570}
1571
1572static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1573{
1574        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1575
1576        return obj_request->img_offset <
1577            round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1578}
1579
1580static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1581{
1582        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1583                atomic_read(&obj_request->kref.refcount));
1584        kref_get(&obj_request->kref);
1585}
1586
1587static void rbd_obj_request_destroy(struct kref *kref);
1588static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1589{
1590        rbd_assert(obj_request != NULL);
1591        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1592                atomic_read(&obj_request->kref.refcount));
1593        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1594}
1595
1596static void rbd_img_request_get(struct rbd_img_request *img_request)
1597{
1598        dout("%s: img %p (was %d)\n", __func__, img_request,
1599             atomic_read(&img_request->kref.refcount));
1600        kref_get(&img_request->kref);
1601}
1602
1603static bool img_request_child_test(struct rbd_img_request *img_request);
1604static void rbd_parent_request_destroy(struct kref *kref);
1605static void rbd_img_request_destroy(struct kref *kref);
1606static void rbd_img_request_put(struct rbd_img_request *img_request)
1607{
1608        rbd_assert(img_request != NULL);
1609        dout("%s: img %p (was %d)\n", __func__, img_request,
1610                atomic_read(&img_request->kref.refcount));
1611        if (img_request_child_test(img_request))
1612                kref_put(&img_request->kref, rbd_parent_request_destroy);
1613        else
1614                kref_put(&img_request->kref, rbd_img_request_destroy);
1615}
1616
1617static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1618                                        struct rbd_obj_request *obj_request)
1619{
1620        rbd_assert(obj_request->img_request == NULL);
1621
1622        /* Image request now owns object's original reference */
1623        obj_request->img_request = img_request;
1624        obj_request->which = img_request->obj_request_count;
1625        rbd_assert(!obj_request_img_data_test(obj_request));
1626        obj_request_img_data_set(obj_request);
1627        rbd_assert(obj_request->which != BAD_WHICH);
1628        img_request->obj_request_count++;
1629        list_add_tail(&obj_request->links, &img_request->obj_requests);
1630        dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1631                obj_request->which);
1632}
1633
1634static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1635                                        struct rbd_obj_request *obj_request)
1636{
1637        rbd_assert(obj_request->which != BAD_WHICH);
1638
1639        dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1640                obj_request->which);
1641        list_del(&obj_request->links);
1642        rbd_assert(img_request->obj_request_count > 0);
1643        img_request->obj_request_count--;
1644        rbd_assert(obj_request->which == img_request->obj_request_count);
1645        obj_request->which = BAD_WHICH;
1646        rbd_assert(obj_request_img_data_test(obj_request));
1647        rbd_assert(obj_request->img_request == img_request);
1648        obj_request->img_request = NULL;
1649        obj_request->callback = NULL;
1650        rbd_obj_request_put(obj_request);
1651}
1652
1653static bool obj_request_type_valid(enum obj_request_type type)
1654{
1655        switch (type) {
1656        case OBJ_REQUEST_NODATA:
1657        case OBJ_REQUEST_BIO:
1658        case OBJ_REQUEST_PAGES:
1659                return true;
1660        default:
1661                return false;
1662        }
1663}
1664
1665static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1666                                struct rbd_obj_request *obj_request)
1667{
1668        dout("%s %p\n", __func__, obj_request);
1669        return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1670}
1671
1672static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1673{
1674        dout("%s %p\n", __func__, obj_request);
1675        ceph_osdc_cancel_request(obj_request->osd_req);
1676}
1677
1678/*
1679 * Wait for an object request to complete.  If interrupted, cancel the
1680 * underlying osd request.
1681 *
1682 * @timeout: in jiffies, 0 means "wait forever"
1683 */
1684static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1685                                  unsigned long timeout)
1686{
1687        long ret;
1688
1689        dout("%s %p\n", __func__, obj_request);
1690        ret = wait_for_completion_interruptible_timeout(
1691                                        &obj_request->completion,
1692                                        ceph_timeout_jiffies(timeout));
1693        if (ret <= 0) {
1694                if (ret == 0)
1695                        ret = -ETIMEDOUT;
1696                rbd_obj_request_end(obj_request);
1697        } else {
1698                ret = 0;
1699        }
1700
1701        dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1702        return ret;
1703}
1704
1705static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1706{
1707        return __rbd_obj_request_wait(obj_request, 0);
1708}
1709
1710static void rbd_img_request_complete(struct rbd_img_request *img_request)
1711{
1712
1713        dout("%s: img %p\n", __func__, img_request);
1714
1715        /*
1716         * If no error occurred, compute the aggregate transfer
1717         * count for the image request.  We could instead use
1718         * atomic64_cmpxchg() to update it as each object request
1719         * completes; not clear which way is better off hand.
1720         */
1721        if (!img_request->result) {
1722                struct rbd_obj_request *obj_request;
1723                u64 xferred = 0;
1724
1725                for_each_obj_request(img_request, obj_request)
1726                        xferred += obj_request->xferred;
1727                img_request->xferred = xferred;
1728        }
1729
1730        if (img_request->callback)
1731                img_request->callback(img_request);
1732        else
1733                rbd_img_request_put(img_request);
1734}
1735
1736/*
1737 * The default/initial value for all image request flags is 0.  Each
1738 * is conditionally set to 1 at image request initialization time
1739 * and currently never change thereafter.
1740 */
1741static void img_request_write_set(struct rbd_img_request *img_request)
1742{
1743        set_bit(IMG_REQ_WRITE, &img_request->flags);
1744        smp_mb();
1745}
1746
1747static bool img_request_write_test(struct rbd_img_request *img_request)
1748{
1749        smp_mb();
1750        return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1751}
1752
1753/*
1754 * Set the discard flag when the img_request is an discard request
1755 */
1756static void img_request_discard_set(struct rbd_img_request *img_request)
1757{
1758        set_bit(IMG_REQ_DISCARD, &img_request->flags);
1759        smp_mb();
1760}
1761
1762static bool img_request_discard_test(struct rbd_img_request *img_request)
1763{
1764        smp_mb();
1765        return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1766}
1767
1768static void img_request_child_set(struct rbd_img_request *img_request)
1769{
1770        set_bit(IMG_REQ_CHILD, &img_request->flags);
1771        smp_mb();
1772}
1773
1774static void img_request_child_clear(struct rbd_img_request *img_request)
1775{
1776        clear_bit(IMG_REQ_CHILD, &img_request->flags);
1777        smp_mb();
1778}
1779
1780static bool img_request_child_test(struct rbd_img_request *img_request)
1781{
1782        smp_mb();
1783        return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1784}
1785
1786static void img_request_layered_set(struct rbd_img_request *img_request)
1787{
1788        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1789        smp_mb();
1790}
1791
1792static void img_request_layered_clear(struct rbd_img_request *img_request)
1793{
1794        clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1795        smp_mb();
1796}
1797
1798static bool img_request_layered_test(struct rbd_img_request *img_request)
1799{
1800        smp_mb();
1801        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1802}
1803
1804static enum obj_operation_type
1805rbd_img_request_op_type(struct rbd_img_request *img_request)
1806{
1807        if (img_request_write_test(img_request))
1808                return OBJ_OP_WRITE;
1809        else if (img_request_discard_test(img_request))
1810                return OBJ_OP_DISCARD;
1811        else
1812                return OBJ_OP_READ;
1813}
1814
1815static void
1816rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1817{
1818        u64 xferred = obj_request->xferred;
1819        u64 length = obj_request->length;
1820
1821        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1822                obj_request, obj_request->img_request, obj_request->result,
1823                xferred, length);
1824        /*
1825         * ENOENT means a hole in the image.  We zero-fill the entire
1826         * length of the request.  A short read also implies zero-fill
1827         * to the end of the request.  An error requires the whole
1828         * length of the request to be reported finished with an error
1829         * to the block layer.  In each case we update the xferred
1830         * count to indicate the whole request was satisfied.
1831         */
1832        rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1833        if (obj_request->result == -ENOENT) {
1834                if (obj_request->type == OBJ_REQUEST_BIO)
1835                        zero_bio_chain(obj_request->bio_list, 0);
1836                else
1837                        zero_pages(obj_request->pages, 0, length);
1838                obj_request->result = 0;
1839        } else if (xferred < length && !obj_request->result) {
1840                if (obj_request->type == OBJ_REQUEST_BIO)
1841                        zero_bio_chain(obj_request->bio_list, xferred);
1842                else
1843                        zero_pages(obj_request->pages, xferred, length);
1844        }
1845        obj_request->xferred = length;
1846        obj_request_done_set(obj_request);
1847}
1848
1849static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1850{
1851        dout("%s: obj %p cb %p\n", __func__, obj_request,
1852                obj_request->callback);
1853        if (obj_request->callback)
1854                obj_request->callback(obj_request);
1855        else
1856                complete_all(&obj_request->completion);
1857}
1858
1859static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1860{
1861        struct rbd_img_request *img_request = NULL;
1862        struct rbd_device *rbd_dev = NULL;
1863        bool layered = false;
1864
1865        if (obj_request_img_data_test(obj_request)) {
1866                img_request = obj_request->img_request;
1867                layered = img_request && img_request_layered_test(img_request);
1868                rbd_dev = img_request->rbd_dev;
1869        }
1870
1871        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1872                obj_request, img_request, obj_request->result,
1873                obj_request->xferred, obj_request->length);
1874        if (layered && obj_request->result == -ENOENT &&
1875                        obj_request->img_offset < rbd_dev->parent_overlap)
1876                rbd_img_parent_read(obj_request);
1877        else if (img_request)
1878                rbd_img_obj_request_read_callback(obj_request);
1879        else
1880                obj_request_done_set(obj_request);
1881}
1882
1883static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1884{
1885        dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1886                obj_request->result, obj_request->length);
1887        /*
1888         * There is no such thing as a successful short write.  Set
1889         * it to our originally-requested length.
1890         */
1891        obj_request->xferred = obj_request->length;
1892        obj_request_done_set(obj_request);
1893}
1894
1895static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1896{
1897        dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1898                obj_request->result, obj_request->length);
1899        /*
1900         * There is no such thing as a successful short discard.  Set
1901         * it to our originally-requested length.
1902         */
1903        obj_request->xferred = obj_request->length;
1904        /* discarding a non-existent object is not a problem */
1905        if (obj_request->result == -ENOENT)
1906                obj_request->result = 0;
1907        obj_request_done_set(obj_request);
1908}
1909
1910/*
1911 * For a simple stat call there's nothing to do.  We'll do more if
1912 * this is part of a write sequence for a layered image.
1913 */
1914static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1915{
1916        dout("%s: obj %p\n", __func__, obj_request);
1917        obj_request_done_set(obj_request);
1918}
1919
1920static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1921{
1922        dout("%s: obj %p\n", __func__, obj_request);
1923
1924        if (obj_request_img_data_test(obj_request))
1925                rbd_osd_copyup_callback(obj_request);
1926        else
1927                obj_request_done_set(obj_request);
1928}
1929
1930static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1931{
1932        struct rbd_obj_request *obj_request = osd_req->r_priv;
1933        u16 opcode;
1934
1935        dout("%s: osd_req %p\n", __func__, osd_req);
1936        rbd_assert(osd_req == obj_request->osd_req);
1937        if (obj_request_img_data_test(obj_request)) {
1938                rbd_assert(obj_request->img_request);
1939                rbd_assert(obj_request->which != BAD_WHICH);
1940        } else {
1941                rbd_assert(obj_request->which == BAD_WHICH);
1942        }
1943
1944        if (osd_req->r_result < 0)
1945                obj_request->result = osd_req->r_result;
1946
1947        /*
1948         * We support a 64-bit length, but ultimately it has to be
1949         * passed to the block layer, which just supports a 32-bit
1950         * length field.
1951         */
1952        obj_request->xferred = osd_req->r_ops[0].outdata_len;
1953        rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1954
1955        opcode = osd_req->r_ops[0].op;
1956        switch (opcode) {
1957        case CEPH_OSD_OP_READ:
1958                rbd_osd_read_callback(obj_request);
1959                break;
1960        case CEPH_OSD_OP_SETALLOCHINT:
1961                rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1962                           osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1963                /* fall through */
1964        case CEPH_OSD_OP_WRITE:
1965        case CEPH_OSD_OP_WRITEFULL:
1966                rbd_osd_write_callback(obj_request);
1967                break;
1968        case CEPH_OSD_OP_STAT:
1969                rbd_osd_stat_callback(obj_request);
1970                break;
1971        case CEPH_OSD_OP_DELETE:
1972        case CEPH_OSD_OP_TRUNCATE:
1973        case CEPH_OSD_OP_ZERO:
1974                rbd_osd_discard_callback(obj_request);
1975                break;
1976        case CEPH_OSD_OP_CALL:
1977                rbd_osd_call_callback(obj_request);
1978                break;
1979        default:
1980                rbd_warn(NULL, "%s: unsupported op %hu",
1981                        obj_request->object_name, (unsigned short) opcode);
1982                break;
1983        }
1984
1985        if (obj_request_done_test(obj_request))
1986                rbd_obj_request_complete(obj_request);
1987}
1988
1989static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1990{
1991        struct rbd_img_request *img_request = obj_request->img_request;
1992        struct ceph_osd_request *osd_req = obj_request->osd_req;
1993
1994        if (img_request)
1995                osd_req->r_snapid = img_request->snap_id;
1996}
1997
1998static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1999{
2000        struct ceph_osd_request *osd_req = obj_request->osd_req;
2001
2002        osd_req->r_mtime = CURRENT_TIME;
2003        osd_req->r_data_offset = obj_request->offset;
2004}
2005
2006/*
2007 * Create an osd request.  A read request has one osd op (read).
2008 * A write request has either one (watch) or two (hint+write) osd ops.
2009 * (All rbd data writes are prefixed with an allocation hint op, but
2010 * technically osd watch is a write request, hence this distinction.)
2011 */
2012static struct ceph_osd_request *rbd_osd_req_create(
2013                                        struct rbd_device *rbd_dev,
2014                                        enum obj_operation_type op_type,
2015                                        unsigned int num_ops,
2016                                        struct rbd_obj_request *obj_request)
2017{
2018        struct ceph_snap_context *snapc = NULL;
2019        struct ceph_osd_client *osdc;
2020        struct ceph_osd_request *osd_req;
2021
2022        if (obj_request_img_data_test(obj_request) &&
2023                (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
2024                struct rbd_img_request *img_request = obj_request->img_request;
2025                if (op_type == OBJ_OP_WRITE) {
2026                        rbd_assert(img_request_write_test(img_request));
2027                } else {
2028                        rbd_assert(img_request_discard_test(img_request));
2029                }
2030                snapc = img_request->snapc;
2031        }
2032
2033        rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
2034
2035        /* Allocate and initialize the request, for the num_ops ops */
2036
2037        osdc = &rbd_dev->rbd_client->client->osdc;
2038        osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2039                                          GFP_NOIO);
2040        if (!osd_req)
2041                goto fail;
2042
2043        if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2044                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2045        else
2046                osd_req->r_flags = CEPH_OSD_FLAG_READ;
2047
2048        osd_req->r_callback = rbd_osd_req_callback;
2049        osd_req->r_priv = obj_request;
2050
2051        osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
2052        if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2053                             obj_request->object_name))
2054                goto fail;
2055
2056        if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2057                goto fail;
2058
2059        return osd_req;
2060
2061fail:
2062        ceph_osdc_put_request(osd_req);
2063        return NULL;
2064}
2065
2066/*
2067 * Create a copyup osd request based on the information in the object
2068 * request supplied.  A copyup request has two or three osd ops, a
2069 * copyup method call, potentially a hint op, and a write or truncate
2070 * or zero op.
2071 */
2072static struct ceph_osd_request *
2073rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2074{
2075        struct rbd_img_request *img_request;
2076        struct ceph_snap_context *snapc;
2077        struct rbd_device *rbd_dev;
2078        struct ceph_osd_client *osdc;
2079        struct ceph_osd_request *osd_req;
2080        int num_osd_ops = 3;
2081
2082        rbd_assert(obj_request_img_data_test(obj_request));
2083        img_request = obj_request->img_request;
2084        rbd_assert(img_request);
2085        rbd_assert(img_request_write_test(img_request) ||
2086                        img_request_discard_test(img_request));
2087
2088        if (img_request_discard_test(img_request))
2089                num_osd_ops = 2;
2090
2091        /* Allocate and initialize the request, for all the ops */
2092
2093        snapc = img_request->snapc;
2094        rbd_dev = img_request->rbd_dev;
2095        osdc = &rbd_dev->rbd_client->client->osdc;
2096        osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2097                                                false, GFP_NOIO);
2098        if (!osd_req)
2099                goto fail;
2100
2101        osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2102        osd_req->r_callback = rbd_osd_req_callback;
2103        osd_req->r_priv = obj_request;
2104
2105        osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
2106        if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2107                             obj_request->object_name))
2108                goto fail;
2109
2110        if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2111                goto fail;
2112
2113        return osd_req;
2114
2115fail:
2116        ceph_osdc_put_request(osd_req);
2117        return NULL;
2118}
2119
2120
2121static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2122{
2123        ceph_osdc_put_request(osd_req);
2124}
2125
2126/* object_name is assumed to be a non-null pointer and NUL-terminated */
2127
2128static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2129                                                u64 offset, u64 length,
2130                                                enum obj_request_type type)
2131{
2132        struct rbd_obj_request *obj_request;
2133        size_t size;
2134        char *name;
2135
2136        rbd_assert(obj_request_type_valid(type));
2137
2138        size = strlen(object_name) + 1;
2139        name = kmalloc(size, GFP_NOIO);
2140        if (!name)
2141                return NULL;
2142
2143        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2144        if (!obj_request) {
2145                kfree(name);
2146                return NULL;
2147        }
2148
2149        obj_request->object_name = memcpy(name, object_name, size);
2150        obj_request->offset = offset;
2151        obj_request->length = length;
2152        obj_request->flags = 0;
2153        obj_request->which = BAD_WHICH;
2154        obj_request->type = type;
2155        INIT_LIST_HEAD(&obj_request->links);
2156        init_completion(&obj_request->completion);
2157        kref_init(&obj_request->kref);
2158
2159        dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2160                offset, length, (int)type, obj_request);
2161
2162        return obj_request;
2163}
2164
2165static void rbd_obj_request_destroy(struct kref *kref)
2166{
2167        struct rbd_obj_request *obj_request;
2168
2169        obj_request = container_of(kref, struct rbd_obj_request, kref);
2170
2171        dout("%s: obj %p\n", __func__, obj_request);
2172
2173        rbd_assert(obj_request->img_request == NULL);
2174        rbd_assert(obj_request->which == BAD_WHICH);
2175
2176        if (obj_request->osd_req)
2177                rbd_osd_req_destroy(obj_request->osd_req);
2178
2179        rbd_assert(obj_request_type_valid(obj_request->type));
2180        switch (obj_request->type) {
2181        case OBJ_REQUEST_NODATA:
2182                break;          /* Nothing to do */
2183        case OBJ_REQUEST_BIO:
2184                if (obj_request->bio_list)
2185                        bio_chain_put(obj_request->bio_list);
2186                break;
2187        case OBJ_REQUEST_PAGES:
2188                if (obj_request->pages)
2189                        ceph_release_page_vector(obj_request->pages,
2190                                                obj_request->page_count);
2191                break;
2192        }
2193
2194        kfree(obj_request->object_name);
2195        obj_request->object_name = NULL;
2196        kmem_cache_free(rbd_obj_request_cache, obj_request);
2197}
2198
2199/* It's OK to call this for a device with no parent */
2200
2201static void rbd_spec_put(struct rbd_spec *spec);
2202static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2203{
2204        rbd_dev_remove_parent(rbd_dev);
2205        rbd_spec_put(rbd_dev->parent_spec);
2206        rbd_dev->parent_spec = NULL;
2207        rbd_dev->parent_overlap = 0;
2208}
2209
2210/*
2211 * Parent image reference counting is used to determine when an
2212 * image's parent fields can be safely torn down--after there are no
2213 * more in-flight requests to the parent image.  When the last
2214 * reference is dropped, cleaning them up is safe.
2215 */
2216static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2217{
2218        int counter;
2219
2220        if (!rbd_dev->parent_spec)
2221                return;
2222
2223        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2224        if (counter > 0)
2225                return;
2226
2227        /* Last reference; clean up parent data structures */
2228
2229        if (!counter)
2230                rbd_dev_unparent(rbd_dev);
2231        else
2232                rbd_warn(rbd_dev, "parent reference underflow");
2233}
2234
2235/*
2236 * If an image has a non-zero parent overlap, get a reference to its
2237 * parent.
2238 *
2239 * Returns true if the rbd device has a parent with a non-zero
2240 * overlap and a reference for it was successfully taken, or
2241 * false otherwise.
2242 */
2243static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2244{
2245        int counter = 0;
2246
2247        if (!rbd_dev->parent_spec)
2248                return false;
2249
2250        down_read(&rbd_dev->header_rwsem);
2251        if (rbd_dev->parent_overlap)
2252                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2253        up_read(&rbd_dev->header_rwsem);
2254
2255        if (counter < 0)
2256                rbd_warn(rbd_dev, "parent reference overflow");
2257
2258        return counter > 0;
2259}
2260
2261/*
2262 * Caller is responsible for filling in the list of object requests
2263 * that comprises the image request, and the Linux request pointer
2264 * (if there is one).
2265 */
2266static struct rbd_img_request *rbd_img_request_create(
2267                                        struct rbd_device *rbd_dev,
2268                                        u64 offset, u64 length,
2269                                        enum obj_operation_type op_type,
2270                                        struct ceph_snap_context *snapc)
2271{
2272        struct rbd_img_request *img_request;
2273
2274        img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2275        if (!img_request)
2276                return NULL;
2277
2278        img_request->rq = NULL;
2279        img_request->rbd_dev = rbd_dev;
2280        img_request->offset = offset;
2281        img_request->length = length;
2282        img_request->flags = 0;
2283        if (op_type == OBJ_OP_DISCARD) {
2284                img_request_discard_set(img_request);
2285                img_request->snapc = snapc;
2286        } else if (op_type == OBJ_OP_WRITE) {
2287                img_request_write_set(img_request);
2288                img_request->snapc = snapc;
2289        } else {
2290                img_request->snap_id = rbd_dev->spec->snap_id;
2291        }
2292        if (rbd_dev_parent_get(rbd_dev))
2293                img_request_layered_set(img_request);
2294        spin_lock_init(&img_request->completion_lock);
2295        img_request->next_completion = 0;
2296        img_request->callback = NULL;
2297        img_request->result = 0;
2298        img_request->obj_request_count = 0;
2299        INIT_LIST_HEAD(&img_request->obj_requests);
2300        kref_init(&img_request->kref);
2301
2302        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2303                obj_op_name(op_type), offset, length, img_request);
2304
2305        return img_request;
2306}
2307
2308static void rbd_img_request_destroy(struct kref *kref)
2309{
2310        struct rbd_img_request *img_request;
2311        struct rbd_obj_request *obj_request;
2312        struct rbd_obj_request *next_obj_request;
2313
2314        img_request = container_of(kref, struct rbd_img_request, kref);
2315
2316        dout("%s: img %p\n", __func__, img_request);
2317
2318        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2319                rbd_img_obj_request_del(img_request, obj_request);
2320        rbd_assert(img_request->obj_request_count == 0);
2321
2322        if (img_request_layered_test(img_request)) {
2323                img_request_layered_clear(img_request);
2324                rbd_dev_parent_put(img_request->rbd_dev);
2325        }
2326
2327        if (img_request_write_test(img_request) ||
2328                img_request_discard_test(img_request))
2329                ceph_put_snap_context(img_request->snapc);
2330
2331        kmem_cache_free(rbd_img_request_cache, img_request);
2332}
2333
2334static struct rbd_img_request *rbd_parent_request_create(
2335                                        struct rbd_obj_request *obj_request,
2336                                        u64 img_offset, u64 length)
2337{
2338        struct rbd_img_request *parent_request;
2339        struct rbd_device *rbd_dev;
2340
2341        rbd_assert(obj_request->img_request);
2342        rbd_dev = obj_request->img_request->rbd_dev;
2343
2344        parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2345                                                length, OBJ_OP_READ, NULL);
2346        if (!parent_request)
2347                return NULL;
2348
2349        img_request_child_set(parent_request);
2350        rbd_obj_request_get(obj_request);
2351        parent_request->obj_request = obj_request;
2352
2353        return parent_request;
2354}
2355
2356static void rbd_parent_request_destroy(struct kref *kref)
2357{
2358        struct rbd_img_request *parent_request;
2359        struct rbd_obj_request *orig_request;
2360
2361        parent_request = container_of(kref, struct rbd_img_request, kref);
2362        orig_request = parent_request->obj_request;
2363
2364        parent_request->obj_request = NULL;
2365        rbd_obj_request_put(orig_request);
2366        img_request_child_clear(parent_request);
2367
2368        rbd_img_request_destroy(kref);
2369}
2370
2371static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2372{
2373        struct rbd_img_request *img_request;
2374        unsigned int xferred;
2375        int result;
2376        bool more;
2377
2378        rbd_assert(obj_request_img_data_test(obj_request));
2379        img_request = obj_request->img_request;
2380
2381        rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2382        xferred = (unsigned int)obj_request->xferred;
2383        result = obj_request->result;
2384        if (result) {
2385                struct rbd_device *rbd_dev = img_request->rbd_dev;
2386                enum obj_operation_type op_type;
2387
2388                if (img_request_discard_test(img_request))
2389                        op_type = OBJ_OP_DISCARD;
2390                else if (img_request_write_test(img_request))
2391                        op_type = OBJ_OP_WRITE;
2392                else
2393                        op_type = OBJ_OP_READ;
2394
2395                rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2396                        obj_op_name(op_type), obj_request->length,
2397                        obj_request->img_offset, obj_request->offset);
2398                rbd_warn(rbd_dev, "  result %d xferred %x",
2399                        result, xferred);
2400                if (!img_request->result)
2401                        img_request->result = result;
2402                /*
2403                 * Need to end I/O on the entire obj_request worth of
2404                 * bytes in case of error.
2405                 */
2406                xferred = obj_request->length;
2407        }
2408
2409        /* Image object requests don't own their page array */
2410
2411        if (obj_request->type == OBJ_REQUEST_PAGES) {
2412                obj_request->pages = NULL;
2413                obj_request->page_count = 0;
2414        }
2415
2416        if (img_request_child_test(img_request)) {
2417                rbd_assert(img_request->obj_request != NULL);
2418                more = obj_request->which < img_request->obj_request_count - 1;
2419        } else {
2420                rbd_assert(img_request->rq != NULL);
2421
2422                more = blk_update_request(img_request->rq, result, xferred);
2423                if (!more)
2424                        __blk_mq_end_request(img_request->rq, result);
2425        }
2426
2427        return more;
2428}
2429
2430static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2431{
2432        struct rbd_img_request *img_request;
2433        u32 which = obj_request->which;
2434        bool more = true;
2435
2436        rbd_assert(obj_request_img_data_test(obj_request));
2437        img_request = obj_request->img_request;
2438
2439        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2440        rbd_assert(img_request != NULL);
2441        rbd_assert(img_request->obj_request_count > 0);
2442        rbd_assert(which != BAD_WHICH);
2443        rbd_assert(which < img_request->obj_request_count);
2444
2445        spin_lock_irq(&img_request->completion_lock);
2446        if (which != img_request->next_completion)
2447                goto out;
2448
2449        for_each_obj_request_from(img_request, obj_request) {
2450                rbd_assert(more);
2451                rbd_assert(which < img_request->obj_request_count);
2452
2453                if (!obj_request_done_test(obj_request))
2454                        break;
2455                more = rbd_img_obj_end_request(obj_request);
2456                which++;
2457        }
2458
2459        rbd_assert(more ^ (which == img_request->obj_request_count));
2460        img_request->next_completion = which;
2461out:
2462        spin_unlock_irq(&img_request->completion_lock);
2463        rbd_img_request_put(img_request);
2464
2465        if (!more)
2466                rbd_img_request_complete(img_request);
2467}
2468
2469/*
2470 * Add individual osd ops to the given ceph_osd_request and prepare
2471 * them for submission. num_ops is the current number of
2472 * osd operations already to the object request.
2473 */
2474static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2475                                struct ceph_osd_request *osd_request,
2476                                enum obj_operation_type op_type,
2477                                unsigned int num_ops)
2478{
2479        struct rbd_img_request *img_request = obj_request->img_request;
2480        struct rbd_device *rbd_dev = img_request->rbd_dev;
2481        u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2482        u64 offset = obj_request->offset;
2483        u64 length = obj_request->length;
2484        u64 img_end;
2485        u16 opcode;
2486
2487        if (op_type == OBJ_OP_DISCARD) {
2488                if (!offset && length == object_size &&
2489                    (!img_request_layered_test(img_request) ||
2490                     !obj_request_overlaps_parent(obj_request))) {
2491                        opcode = CEPH_OSD_OP_DELETE;
2492                } else if ((offset + length == object_size)) {
2493                        opcode = CEPH_OSD_OP_TRUNCATE;
2494                } else {
2495                        down_read(&rbd_dev->header_rwsem);
2496                        img_end = rbd_dev->header.image_size;
2497                        up_read(&rbd_dev->header_rwsem);
2498
2499                        if (obj_request->img_offset + length == img_end)
2500                                opcode = CEPH_OSD_OP_TRUNCATE;
2501                        else
2502                                opcode = CEPH_OSD_OP_ZERO;
2503                }
2504        } else if (op_type == OBJ_OP_WRITE) {
2505                if (!offset && length == object_size)
2506                        opcode = CEPH_OSD_OP_WRITEFULL;
2507                else
2508                        opcode = CEPH_OSD_OP_WRITE;
2509                osd_req_op_alloc_hint_init(osd_request, num_ops,
2510                                        object_size, object_size);
2511                num_ops++;
2512        } else {
2513                opcode = CEPH_OSD_OP_READ;
2514        }
2515
2516        if (opcode == CEPH_OSD_OP_DELETE)
2517                osd_req_op_init(osd_request, num_ops, opcode, 0);
2518        else
2519                osd_req_op_extent_init(osd_request, num_ops, opcode,
2520                                       offset, length, 0, 0);
2521
2522        if (obj_request->type == OBJ_REQUEST_BIO)
2523                osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2524                                        obj_request->bio_list, length);
2525        else if (obj_request->type == OBJ_REQUEST_PAGES)
2526                osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2527                                        obj_request->pages, length,
2528                                        offset & ~PAGE_MASK, false, false);
2529
2530        /* Discards are also writes */
2531        if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2532                rbd_osd_req_format_write(obj_request);
2533        else
2534                rbd_osd_req_format_read(obj_request);
2535}
2536
2537/*
2538 * Split up an image request into one or more object requests, each
2539 * to a different object.  The "type" parameter indicates whether
2540 * "data_desc" is the pointer to the head of a list of bio
2541 * structures, or the base of a page array.  In either case this
2542 * function assumes data_desc describes memory sufficient to hold
2543 * all data described by the image request.
2544 */
2545static int rbd_img_request_fill(struct rbd_img_request *img_request,
2546                                        enum obj_request_type type,
2547                                        void *data_desc)
2548{
2549        struct rbd_device *rbd_dev = img_request->rbd_dev;
2550        struct rbd_obj_request *obj_request = NULL;
2551        struct rbd_obj_request *next_obj_request;
2552        struct bio *bio_list = NULL;
2553        unsigned int bio_offset = 0;
2554        struct page **pages = NULL;
2555        enum obj_operation_type op_type;
2556        u64 img_offset;
2557        u64 resid;
2558
2559        dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2560                (int)type, data_desc);
2561
2562        img_offset = img_request->offset;
2563        resid = img_request->length;
2564        rbd_assert(resid > 0);
2565        op_type = rbd_img_request_op_type(img_request);
2566
2567        if (type == OBJ_REQUEST_BIO) {
2568                bio_list = data_desc;
2569                rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2570        } else if (type == OBJ_REQUEST_PAGES) {
2571                pages = data_desc;
2572        }
2573
2574        while (resid) {
2575                struct ceph_osd_request *osd_req;
2576                const char *object_name;
2577                u64 offset;
2578                u64 length;
2579
2580                object_name = rbd_segment_name(rbd_dev, img_offset);
2581                if (!object_name)
2582                        goto out_unwind;
2583                offset = rbd_segment_offset(rbd_dev, img_offset);
2584                length = rbd_segment_length(rbd_dev, img_offset, resid);
2585                obj_request = rbd_obj_request_create(object_name,
2586                                                offset, length, type);
2587                /* object request has its own copy of the object name */
2588                rbd_segment_name_free(object_name);
2589                if (!obj_request)
2590                        goto out_unwind;
2591
2592                /*
2593                 * set obj_request->img_request before creating the
2594                 * osd_request so that it gets the right snapc
2595                 */
2596                rbd_img_obj_request_add(img_request, obj_request);
2597
2598                if (type == OBJ_REQUEST_BIO) {
2599                        unsigned int clone_size;
2600
2601                        rbd_assert(length <= (u64)UINT_MAX);
2602                        clone_size = (unsigned int)length;
2603                        obj_request->bio_list =
2604                                        bio_chain_clone_range(&bio_list,
2605                                                                &bio_offset,
2606                                                                clone_size,
2607                                                                GFP_NOIO);
2608                        if (!obj_request->bio_list)
2609                                goto out_unwind;
2610                } else if (type == OBJ_REQUEST_PAGES) {
2611                        unsigned int page_count;
2612
2613                        obj_request->pages = pages;
2614                        page_count = (u32)calc_pages_for(offset, length);
2615                        obj_request->page_count = page_count;
2616                        if ((offset + length) & ~PAGE_MASK)
2617                                page_count--;   /* more on last page */
2618                        pages += page_count;
2619                }
2620
2621                osd_req = rbd_osd_req_create(rbd_dev, op_type,
2622                                        (op_type == OBJ_OP_WRITE) ? 2 : 1,
2623                                        obj_request);
2624                if (!osd_req)
2625                        goto out_unwind;
2626
2627                obj_request->osd_req = osd_req;
2628                obj_request->callback = rbd_img_obj_callback;
2629                obj_request->img_offset = img_offset;
2630
2631                rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2632
2633                rbd_img_request_get(img_request);
2634
2635                img_offset += length;
2636                resid -= length;
2637        }
2638
2639        return 0;
2640
2641out_unwind:
2642        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2643                rbd_img_obj_request_del(img_request, obj_request);
2644
2645        return -ENOMEM;
2646}
2647
2648static void
2649rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2650{
2651        struct rbd_img_request *img_request;
2652        struct rbd_device *rbd_dev;
2653        struct page **pages;
2654        u32 page_count;
2655
2656        dout("%s: obj %p\n", __func__, obj_request);
2657
2658        rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2659                obj_request->type == OBJ_REQUEST_NODATA);
2660        rbd_assert(obj_request_img_data_test(obj_request));
2661        img_request = obj_request->img_request;
2662        rbd_assert(img_request);
2663
2664        rbd_dev = img_request->rbd_dev;
2665        rbd_assert(rbd_dev);
2666
2667        pages = obj_request->copyup_pages;
2668        rbd_assert(pages != NULL);
2669        obj_request->copyup_pages = NULL;
2670        page_count = obj_request->copyup_page_count;
2671        rbd_assert(page_count);
2672        obj_request->copyup_page_count = 0;
2673        ceph_release_page_vector(pages, page_count);
2674
2675        /*
2676         * We want the transfer count to reflect the size of the
2677         * original write request.  There is no such thing as a
2678         * successful short write, so if the request was successful
2679         * we can just set it to the originally-requested length.
2680         */
2681        if (!obj_request->result)
2682                obj_request->xferred = obj_request->length;
2683
2684        obj_request_done_set(obj_request);
2685}
2686
2687static void
2688rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2689{
2690        struct rbd_obj_request *orig_request;
2691        struct ceph_osd_request *osd_req;
2692        struct ceph_osd_client *osdc;
2693        struct rbd_device *rbd_dev;
2694        struct page **pages;
2695        enum obj_operation_type op_type;
2696        u32 page_count;
2697        int img_result;
2698        u64 parent_length;
2699
2700        rbd_assert(img_request_child_test(img_request));
2701
2702        /* First get what we need from the image request */
2703
2704        pages = img_request->copyup_pages;
2705        rbd_assert(pages != NULL);
2706        img_request->copyup_pages = NULL;
2707        page_count = img_request->copyup_page_count;
2708        rbd_assert(page_count);
2709        img_request->copyup_page_count = 0;
2710
2711        orig_request = img_request->obj_request;
2712        rbd_assert(orig_request != NULL);
2713        rbd_assert(obj_request_type_valid(orig_request->type));
2714        img_result = img_request->result;
2715        parent_length = img_request->length;
2716        rbd_assert(parent_length == img_request->xferred);
2717        rbd_img_request_put(img_request);
2718
2719        rbd_assert(orig_request->img_request);
2720        rbd_dev = orig_request->img_request->rbd_dev;
2721        rbd_assert(rbd_dev);
2722
2723        /*
2724         * If the overlap has become 0 (most likely because the
2725         * image has been flattened) we need to free the pages
2726         * and re-submit the original write request.
2727         */
2728        if (!rbd_dev->parent_overlap) {
2729                struct ceph_osd_client *osdc;
2730
2731                ceph_release_page_vector(pages, page_count);
2732                osdc = &rbd_dev->rbd_client->client->osdc;
2733                img_result = rbd_obj_request_submit(osdc, orig_request);
2734                if (!img_result)
2735                        return;
2736        }
2737
2738        if (img_result)
2739                goto out_err;
2740
2741        /*
2742         * The original osd request is of no use to use any more.
2743         * We need a new one that can hold the three ops in a copyup
2744         * request.  Allocate the new copyup osd request for the
2745         * original request, and release the old one.
2746         */
2747        img_result = -ENOMEM;
2748        osd_req = rbd_osd_req_create_copyup(orig_request);
2749        if (!osd_req)
2750                goto out_err;
2751        rbd_osd_req_destroy(orig_request->osd_req);
2752        orig_request->osd_req = osd_req;
2753        orig_request->copyup_pages = pages;
2754        orig_request->copyup_page_count = page_count;
2755
2756        /* Initialize the copyup op */
2757
2758        osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2759        osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2760                                                false, false);
2761
2762        /* Add the other op(s) */
2763
2764        op_type = rbd_img_request_op_type(orig_request->img_request);
2765        rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2766
2767        /* All set, send it off. */
2768
2769        osdc = &rbd_dev->rbd_client->client->osdc;
2770        img_result = rbd_obj_request_submit(osdc, orig_request);
2771        if (!img_result)
2772                return;
2773out_err:
2774        /* Record the error code and complete the request */
2775
2776        orig_request->result = img_result;
2777        orig_request->xferred = 0;
2778        obj_request_done_set(orig_request);
2779        rbd_obj_request_complete(orig_request);
2780}
2781
2782/*
2783 * Read from the parent image the range of data that covers the
2784 * entire target of the given object request.  This is used for
2785 * satisfying a layered image write request when the target of an
2786 * object request from the image request does not exist.
2787 *
2788 * A page array big enough to hold the returned data is allocated
2789 * and supplied to rbd_img_request_fill() as the "data descriptor."
2790 * When the read completes, this page array will be transferred to
2791 * the original object request for the copyup operation.
2792 *
2793 * If an error occurs, record it as the result of the original
2794 * object request and mark it done so it gets completed.
2795 */
2796static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2797{
2798        struct rbd_img_request *img_request = NULL;
2799        struct rbd_img_request *parent_request = NULL;
2800        struct rbd_device *rbd_dev;
2801        u64 img_offset;
2802        u64 length;
2803        struct page **pages = NULL;
2804        u32 page_count;
2805        int result;
2806
2807        rbd_assert(obj_request_img_data_test(obj_request));
2808        rbd_assert(obj_request_type_valid(obj_request->type));
2809
2810        img_request = obj_request->img_request;
2811        rbd_assert(img_request != NULL);
2812        rbd_dev = img_request->rbd_dev;
2813        rbd_assert(rbd_dev->parent != NULL);
2814
2815        /*
2816         * Determine the byte range covered by the object in the
2817         * child image to which the original request was to be sent.
2818         */
2819        img_offset = obj_request->img_offset - obj_request->offset;
2820        length = (u64)1 << rbd_dev->header.obj_order;
2821
2822        /*
2823         * There is no defined parent data beyond the parent
2824         * overlap, so limit what we read at that boundary if
2825         * necessary.
2826         */
2827        if (img_offset + length > rbd_dev->parent_overlap) {
2828                rbd_assert(img_offset < rbd_dev->parent_overlap);
2829                length = rbd_dev->parent_overlap - img_offset;
2830        }
2831
2832        /*
2833         * Allocate a page array big enough to receive the data read
2834         * from the parent.
2835         */
2836        page_count = (u32)calc_pages_for(0, length);
2837        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2838        if (IS_ERR(pages)) {
2839                result = PTR_ERR(pages);
2840                pages = NULL;
2841                goto out_err;
2842        }
2843
2844        result = -ENOMEM;
2845        parent_request = rbd_parent_request_create(obj_request,
2846                                                img_offset, length);
2847        if (!parent_request)
2848                goto out_err;
2849
2850        result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2851        if (result)
2852                goto out_err;
2853        parent_request->copyup_pages = pages;
2854        parent_request->copyup_page_count = page_count;
2855
2856        parent_request->callback = rbd_img_obj_parent_read_full_callback;
2857        result = rbd_img_request_submit(parent_request);
2858        if (!result)
2859                return 0;
2860
2861        parent_request->copyup_pages = NULL;
2862        parent_request->copyup_page_count = 0;
2863        parent_request->obj_request = NULL;
2864        rbd_obj_request_put(obj_request);
2865out_err:
2866        if (pages)
2867                ceph_release_page_vector(pages, page_count);
2868        if (parent_request)
2869                rbd_img_request_put(parent_request);
2870        obj_request->result = result;
2871        obj_request->xferred = 0;
2872        obj_request_done_set(obj_request);
2873
2874        return result;
2875}
2876
2877static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2878{
2879        struct rbd_obj_request *orig_request;
2880        struct rbd_device *rbd_dev;
2881        int result;
2882
2883        rbd_assert(!obj_request_img_data_test(obj_request));
2884
2885        /*
2886         * All we need from the object request is the original
2887         * request and the result of the STAT op.  Grab those, then
2888         * we're done with the request.
2889         */
2890        orig_request = obj_request->obj_request;
2891        obj_request->obj_request = NULL;
2892        rbd_obj_request_put(orig_request);
2893        rbd_assert(orig_request);
2894        rbd_assert(orig_request->img_request);
2895
2896        result = obj_request->result;
2897        obj_request->result = 0;
2898
2899        dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2900                obj_request, orig_request, result,
2901                obj_request->xferred, obj_request->length);
2902        rbd_obj_request_put(obj_request);
2903
2904        /*
2905         * If the overlap has become 0 (most likely because the
2906         * image has been flattened) we need to free the pages
2907         * and re-submit the original write request.
2908         */
2909        rbd_dev = orig_request->img_request->rbd_dev;
2910        if (!rbd_dev->parent_overlap) {
2911                struct ceph_osd_client *osdc;
2912
2913                osdc = &rbd_dev->rbd_client->client->osdc;
2914                result = rbd_obj_request_submit(osdc, orig_request);
2915                if (!result)
2916                        return;
2917        }
2918
2919        /*
2920         * Our only purpose here is to determine whether the object
2921         * exists, and we don't want to treat the non-existence as
2922         * an error.  If something else comes back, transfer the
2923         * error to the original request and complete it now.
2924         */
2925        if (!result) {
2926                obj_request_existence_set(orig_request, true);
2927        } else if (result == -ENOENT) {
2928                obj_request_existence_set(orig_request, false);
2929        } else if (result) {
2930                orig_request->result = result;
2931                goto out;
2932        }
2933
2934        /*
2935         * Resubmit the original request now that we have recorded
2936         * whether the target object exists.
2937         */
2938        orig_request->result = rbd_img_obj_request_submit(orig_request);
2939out:
2940        if (orig_request->result)
2941                rbd_obj_request_complete(orig_request);
2942}
2943
2944static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2945{
2946        struct rbd_obj_request *stat_request;
2947        struct rbd_device *rbd_dev;
2948        struct ceph_osd_client *osdc;
2949        struct page **pages = NULL;
2950        u32 page_count;
2951        size_t size;
2952        int ret;
2953
2954        /*
2955         * The response data for a STAT call consists of:
2956         *     le64 length;
2957         *     struct {
2958         *         le32 tv_sec;
2959         *         le32 tv_nsec;
2960         *     } mtime;
2961         */
2962        size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2963        page_count = (u32)calc_pages_for(0, size);
2964        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2965        if (IS_ERR(pages))
2966                return PTR_ERR(pages);
2967
2968        ret = -ENOMEM;
2969        stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2970                                                        OBJ_REQUEST_PAGES);
2971        if (!stat_request)
2972                goto out;
2973
2974        rbd_obj_request_get(obj_request);
2975        stat_request->obj_request = obj_request;
2976        stat_request->pages = pages;
2977        stat_request->page_count = page_count;
2978
2979        rbd_assert(obj_request->img_request);
2980        rbd_dev = obj_request->img_request->rbd_dev;
2981        stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2982                                                   stat_request);
2983        if (!stat_request->osd_req)
2984                goto out;
2985        stat_request->callback = rbd_img_obj_exists_callback;
2986
2987        osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2988        osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2989                                        false, false);
2990        rbd_osd_req_format_read(stat_request);
2991
2992        osdc = &rbd_dev->rbd_client->client->osdc;
2993        ret = rbd_obj_request_submit(osdc, stat_request);
2994out:
2995        if (ret)
2996                rbd_obj_request_put(obj_request);
2997
2998        return ret;
2999}
3000
3001static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
3002{
3003        struct rbd_img_request *img_request;
3004        struct rbd_device *rbd_dev;
3005
3006        rbd_assert(obj_request_img_data_test(obj_request));
3007
3008        img_request = obj_request->img_request;
3009        rbd_assert(img_request);
3010        rbd_dev = img_request->rbd_dev;
3011
3012        /* Reads */
3013        if (!img_request_write_test(img_request) &&
3014            !img_request_discard_test(img_request))
3015                return true;
3016
3017        /* Non-layered writes */
3018        if (!img_request_layered_test(img_request))
3019                return true;
3020
3021        /*
3022         * Layered writes outside of the parent overlap range don't
3023         * share any data with the parent.
3024         */
3025        if (!obj_request_overlaps_parent(obj_request))
3026                return true;
3027
3028        /*
3029         * Entire-object layered writes - we will overwrite whatever
3030         * parent data there is anyway.
3031         */
3032        if (!obj_request->offset &&
3033            obj_request->length == rbd_obj_bytes(&rbd_dev->header))
3034                return true;
3035
3036        /*
3037         * If the object is known to already exist, its parent data has
3038         * already been copied.
3039         */
3040        if (obj_request_known_test(obj_request) &&
3041            obj_request_exists_test(obj_request))
3042                return true;
3043
3044        return false;
3045}
3046
3047static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
3048{
3049        if (img_obj_request_simple(obj_request)) {
3050                struct rbd_device *rbd_dev;
3051                struct ceph_osd_client *osdc;
3052
3053                rbd_dev = obj_request->img_request->rbd_dev;
3054                osdc = &rbd_dev->rbd_client->client->osdc;
3055
3056                return rbd_obj_request_submit(osdc, obj_request);
3057        }
3058
3059        /*
3060         * It's a layered write.  The target object might exist but
3061         * we may not know that yet.  If we know it doesn't exist,
3062         * start by reading the data for the full target object from
3063         * the parent so we can use it for a copyup to the target.
3064         */
3065        if (obj_request_known_test(obj_request))
3066                return rbd_img_obj_parent_read_full(obj_request);
3067
3068        /* We don't know whether the target exists.  Go find out. */
3069
3070        return rbd_img_obj_exists_submit(obj_request);
3071}
3072
3073static int rbd_img_request_submit(struct rbd_img_request *img_request)
3074{
3075        struct rbd_obj_request *obj_request;
3076        struct rbd_obj_request *next_obj_request;
3077        int ret = 0;
3078
3079        dout("%s: img %p\n", __func__, img_request);
3080
3081        rbd_img_request_get(img_request);
3082        for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
3083                ret = rbd_img_obj_request_submit(obj_request);
3084                if (ret)
3085                        goto out_put_ireq;
3086        }
3087
3088out_put_ireq:
3089        rbd_img_request_put(img_request);
3090        return ret;
3091}
3092
3093static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3094{
3095        struct rbd_obj_request *obj_request;
3096        struct rbd_device *rbd_dev;
3097        u64 obj_end;
3098        u64 img_xferred;
3099        int img_result;
3100
3101        rbd_assert(img_request_child_test(img_request));
3102
3103        /* First get what we need from the image request and release it */
3104
3105        obj_request = img_request->obj_request;
3106        img_xferred = img_request->xferred;
3107        img_result = img_request->result;
3108        rbd_img_request_put(img_request);
3109
3110        /*
3111         * If the overlap has become 0 (most likely because the
3112         * image has been flattened) we need to re-submit the
3113         * original request.
3114         */
3115        rbd_assert(obj_request);
3116        rbd_assert(obj_request->img_request);
3117        rbd_dev = obj_request->img_request->rbd_dev;
3118        if (!rbd_dev->parent_overlap) {
3119                struct ceph_osd_client *osdc;
3120
3121                osdc = &rbd_dev->rbd_client->client->osdc;
3122                img_result = rbd_obj_request_submit(osdc, obj_request);
3123                if (!img_result)
3124                        return;
3125        }
3126
3127        obj_request->result = img_result;
3128        if (obj_request->result)
3129                goto out;
3130
3131        /*
3132         * We need to zero anything beyond the parent overlap
3133         * boundary.  Since rbd_img_obj_request_read_callback()
3134         * will zero anything beyond the end of a short read, an
3135         * easy way to do this is to pretend the data from the
3136         * parent came up short--ending at the overlap boundary.
3137         */
3138        rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3139        obj_end = obj_request->img_offset + obj_request->length;
3140        if (obj_end > rbd_dev->parent_overlap) {
3141                u64 xferred = 0;
3142
3143                if (obj_request->img_offset < rbd_dev->parent_overlap)
3144                        xferred = rbd_dev->parent_overlap -
3145                                        obj_request->img_offset;
3146
3147                obj_request->xferred = min(img_xferred, xferred);
3148        } else {
3149                obj_request->xferred = img_xferred;
3150        }
3151out:
3152        rbd_img_obj_request_read_callback(obj_request);
3153        rbd_obj_request_complete(obj_request);
3154}
3155
3156static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3157{
3158        struct rbd_img_request *img_request;
3159        int result;
3160
3161        rbd_assert(obj_request_img_data_test(obj_request));
3162        rbd_assert(obj_request->img_request != NULL);
3163        rbd_assert(obj_request->result == (s32) -ENOENT);
3164        rbd_assert(obj_request_type_valid(obj_request->type));
3165
3166        /* rbd_read_finish(obj_request, obj_request->length); */
3167        img_request = rbd_parent_request_create(obj_request,
3168                                                obj_request->img_offset,
3169                                                obj_request->length);
3170        result = -ENOMEM;
3171        if (!img_request)
3172                goto out_err;
3173
3174        if (obj_request->type == OBJ_REQUEST_BIO)
3175                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3176                                                obj_request->bio_list);
3177        else
3178                result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3179                                                obj_request->pages);
3180        if (result)
3181                goto out_err;
3182
3183        img_request->callback = rbd_img_parent_read_callback;
3184        result = rbd_img_request_submit(img_request);
3185        if (result)
3186                goto out_err;
3187
3188        return;
3189out_err:
3190        if (img_request)
3191                rbd_img_request_put(img_request);
3192        obj_request->result = result;
3193        obj_request->xferred = 0;
3194        obj_request_done_set(obj_request);
3195}
3196
3197static const struct rbd_client_id rbd_empty_cid;
3198
3199static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3200                          const struct rbd_client_id *rhs)
3201{
3202        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3203}
3204
3205static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3206{
3207        struct rbd_client_id cid;
3208
3209        mutex_lock(&rbd_dev->watch_mutex);
3210        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3211        cid.handle = rbd_dev->watch_cookie;
3212        mutex_unlock(&rbd_dev->watch_mutex);
3213        return cid;
3214}
3215
3216/*
3217 * lock_rwsem must be held for write
3218 */
3219static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3220                              const struct rbd_client_id *cid)
3221{
3222        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3223             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3224             cid->gid, cid->handle);
3225        rbd_dev->owner_cid = *cid; /* struct */
3226}
3227
3228static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3229{
3230        mutex_lock(&rbd_dev->watch_mutex);
3231        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3232        mutex_unlock(&rbd_dev->watch_mutex);
3233}
3234
3235/*
3236 * lock_rwsem must be held for write
3237 */
3238static int rbd_lock(struct rbd_device *rbd_dev)
3239{
3240        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3241        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3242        char cookie[32];
3243        int ret;
3244
3245        WARN_ON(__rbd_is_lock_owner(rbd_dev));
3246
3247        format_lock_cookie(rbd_dev, cookie);
3248        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3249                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3250                            RBD_LOCK_TAG, "", 0);
3251        if (ret)
3252                return ret;
3253
3254        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3255        rbd_set_owner_cid(rbd_dev, &cid);
3256        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3257        return 0;
3258}
3259
3260/*
3261 * lock_rwsem must be held for write
3262 */
3263static int rbd_unlock(struct rbd_device *rbd_dev)
3264{
3265        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3266        char cookie[32];
3267        int ret;
3268
3269        WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3270
3271        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3272
3273        format_lock_cookie(rbd_dev, cookie);
3274        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3275                              RBD_LOCK_NAME, cookie);
3276        if (ret && ret != -ENOENT) {
3277                rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3278                return ret;
3279        }
3280
3281        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3282        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3283        return 0;
3284}
3285
3286static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3287                                enum rbd_notify_op notify_op,
3288                                struct page ***preply_pages,
3289                                size_t *preply_len)
3290{
3291        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3292        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3293        int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3294        char buf[buf_size];
3295        void *p = buf;
3296
3297        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3298
3299        /* encode *LockPayload NotifyMessage (op + ClientId) */
3300        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3301        ceph_encode_32(&p, notify_op);
3302        ceph_encode_64(&p, cid.gid);
3303        ceph_encode_64(&p, cid.handle);
3304
3305        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3306                                &rbd_dev->header_oloc, buf, buf_size,
3307                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3308}
3309
3310static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3311                               enum rbd_notify_op notify_op)
3312{
3313        struct page **reply_pages;
3314        size_t reply_len;
3315
3316        __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3317        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3318}
3319
3320static void rbd_notify_acquired_lock(struct work_struct *work)
3321{
3322        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3323                                                  acquired_lock_work);
3324
3325        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3326}
3327
3328static void rbd_notify_released_lock(struct work_struct *work)
3329{
3330        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3331                                                  released_lock_work);
3332
3333        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3334}
3335
3336static int rbd_request_lock(struct rbd_device *rbd_dev)
3337{
3338        struct page **reply_pages;
3339        size_t reply_len;
3340        bool lock_owner_responded = false;
3341        int ret;
3342
3343        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3344
3345        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3346                                   &reply_pages, &reply_len);
3347        if (ret && ret != -ETIMEDOUT) {
3348                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3349                goto out;
3350        }
3351
3352        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3353                void *p = page_address(reply_pages[0]);
3354                void *const end = p + reply_len;
3355                u32 n;
3356
3357                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3358                while (n--) {
3359                        u8 struct_v;
3360                        u32 len;
3361
3362                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3363                        p += 8 + 8; /* skip gid and cookie */
3364
3365                        ceph_decode_32_safe(&p, end, len, e_inval);
3366                        if (!len)
3367                                continue;
3368
3369                        if (lock_owner_responded) {
3370                                rbd_warn(rbd_dev,
3371                                         "duplicate lock owners detected");
3372                                ret = -EIO;
3373                                goto out;
3374                        }
3375
3376                        lock_owner_responded = true;
3377                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3378                                                  &struct_v, &len);
3379                        if (ret) {
3380                                rbd_warn(rbd_dev,
3381                                         "failed to decode ResponseMessage: %d",
3382                                         ret);
3383                                goto e_inval;
3384                        }
3385
3386                        ret = ceph_decode_32(&p);
3387                }
3388        }
3389
3390        if (!lock_owner_responded) {
3391                rbd_warn(rbd_dev, "no lock owners detected");
3392                ret = -ETIMEDOUT;
3393        }
3394
3395out:
3396        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3397        return ret;
3398
3399e_inval:
3400        ret = -EINVAL;
3401        goto out;
3402}
3403
3404static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3405{
3406        dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3407
3408        cancel_delayed_work(&rbd_dev->lock_dwork);
3409        if (wake_all)
3410                wake_up_all(&rbd_dev->lock_waitq);
3411        else
3412                wake_up(&rbd_dev->lock_waitq);
3413}
3414
3415static int get_lock_owner_info(struct rbd_device *rbd_dev,
3416                               struct ceph_locker **lockers, u32 *num_lockers)
3417{
3418        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3419        u8 lock_type;
3420        char *lock_tag;
3421        int ret;
3422
3423        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3424
3425        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3426                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3427                                 &lock_type, &lock_tag, lockers, num_lockers);
3428        if (ret)
3429                return ret;
3430
3431        if (*num_lockers == 0) {
3432                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3433                goto out;
3434        }
3435
3436        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3437                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3438                         lock_tag);
3439                ret = -EBUSY;
3440                goto out;
3441        }
3442
3443        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3444                rbd_warn(rbd_dev, "shared lock type detected");
3445                ret = -EBUSY;
3446                goto out;
3447        }
3448
3449        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3450                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3451                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3452                         (*lockers)[0].id.cookie);
3453                ret = -EBUSY;
3454                goto out;
3455        }
3456
3457out:
3458        kfree(lock_tag);
3459        return ret;
3460}
3461
3462static int find_watcher(struct rbd_device *rbd_dev,
3463                        const struct ceph_locker *locker)
3464{
3465        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3466        struct ceph_watch_item *watchers;
3467        u32 num_watchers;
3468        u64 cookie;
3469        int i;
3470        int ret;
3471
3472        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3473                                      &rbd_dev->header_oloc, &watchers,
3474                                      &num_watchers);
3475        if (ret)
3476                return ret;
3477
3478        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3479        for (i = 0; i < num_watchers; i++) {
3480                if (!memcmp(&watchers[i].addr, &locker->info.addr,
3481                            sizeof(locker->info.addr)) &&
3482                    watchers[i].cookie == cookie) {
3483                        struct rbd_client_id cid = {
3484                                .gid = le64_to_cpu(watchers[i].name.num),
3485                                .handle = cookie,
3486                        };
3487
3488                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3489                             rbd_dev, cid.gid, cid.handle);
3490                        rbd_set_owner_cid(rbd_dev, &cid);
3491                        ret = 1;
3492                        goto out;
3493                }
3494        }
3495
3496        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3497        ret = 0;
3498out:
3499        kfree(watchers);
3500        return ret;
3501}
3502
3503/*
3504 * lock_rwsem must be held for write
3505 */
3506static int rbd_try_lock(struct rbd_device *rbd_dev)
3507{
3508        struct ceph_client *client = rbd_dev->rbd_client->client;
3509        struct ceph_locker *lockers;
3510        u32 num_lockers;
3511        int ret;
3512
3513        for (;;) {
3514                ret = rbd_lock(rbd_dev);
3515                if (ret != -EBUSY)
3516                        return ret;
3517
3518                /* determine if the current lock holder is still alive */
3519                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3520                if (ret)
3521                        return ret;
3522
3523                if (num_lockers == 0)
3524                        goto again;
3525
3526                ret = find_watcher(rbd_dev, lockers);
3527                if (ret) {
3528                        if (ret > 0)
3529                                ret = 0; /* have to request lock */
3530                        goto out;
3531                }
3532
3533                rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3534                         ENTITY_NAME(lockers[0].id.name));
3535
3536                ret = ceph_monc_blacklist_add(&client->monc,
3537                                              &lockers[0].info.addr);
3538                if (ret) {
3539                        rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3540                                 ENTITY_NAME(lockers[0].id.name), ret);
3541                        goto out;
3542                }
3543
3544                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3545                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
3546                                          lockers[0].id.cookie,
3547                                          &lockers[0].id.name);
3548                if (ret && ret != -ENOENT)
3549                        goto out;
3550
3551again:
3552                ceph_free_lockers(lockers, num_lockers);
3553        }
3554
3555out:
3556        ceph_free_lockers(lockers, num_lockers);
3557        return ret;
3558}
3559
3560/*
3561 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3562 */
3563static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3564                                                int *pret)
3565{
3566        enum rbd_lock_state lock_state;
3567
3568        down_read(&rbd_dev->lock_rwsem);
3569        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3570             rbd_dev->lock_state);
3571        if (__rbd_is_lock_owner(rbd_dev)) {
3572                lock_state = rbd_dev->lock_state;
3573                up_read(&rbd_dev->lock_rwsem);
3574                return lock_state;
3575        }
3576
3577        up_read(&rbd_dev->lock_rwsem);
3578        down_write(&rbd_dev->lock_rwsem);
3579        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3580             rbd_dev->lock_state);
3581        if (!__rbd_is_lock_owner(rbd_dev)) {
3582                *pret = rbd_try_lock(rbd_dev);
3583                if (*pret)
3584                        rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3585        }
3586
3587        lock_state = rbd_dev->lock_state;
3588        up_write(&rbd_dev->lock_rwsem);
3589        return lock_state;
3590}
3591
3592static void rbd_acquire_lock(struct work_struct *work)
3593{
3594        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3595                                            struct rbd_device, lock_dwork);
3596        enum rbd_lock_state lock_state;
3597        int ret;
3598
3599        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3600again:
3601        lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3602        if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3603                if (lock_state == RBD_LOCK_STATE_LOCKED)
3604                        wake_requests(rbd_dev, true);
3605                dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3606                     rbd_dev, lock_state, ret);
3607                return;
3608        }
3609
3610        ret = rbd_request_lock(rbd_dev);
3611        if (ret == -ETIMEDOUT) {
3612                goto again; /* treat this as a dead client */
3613        } else if (ret < 0) {
3614                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3615                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3616                                 RBD_RETRY_DELAY);
3617        } else {
3618                /*
3619                 * lock owner acked, but resend if we don't see them
3620                 * release the lock
3621                 */
3622                dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3623                     rbd_dev);
3624                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3625                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3626        }
3627}
3628
3629/*
3630 * lock_rwsem must be held for write
3631 */
3632static bool rbd_release_lock(struct rbd_device *rbd_dev)
3633{
3634        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3635             rbd_dev->lock_state);
3636        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3637                return false;
3638
3639        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3640        downgrade_write(&rbd_dev->lock_rwsem);
3641        /*
3642         * Ensure that all in-flight IO is flushed.
3643         *
3644         * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3645         * may be shared with other devices.
3646         */
3647        ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3648        up_read(&rbd_dev->lock_rwsem);
3649
3650        down_write(&rbd_dev->lock_rwsem);
3651        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3652             rbd_dev->lock_state);
3653        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3654                return false;
3655
3656        if (!rbd_unlock(rbd_dev))
3657                /*
3658                 * Give others a chance to grab the lock - we would re-acquire
3659                 * almost immediately if we got new IO during ceph_osdc_sync()
3660                 * otherwise.  We need to ack our own notifications, so this
3661                 * lock_dwork will be requeued from rbd_wait_state_locked()
3662                 * after wake_requests() in rbd_handle_released_lock().
3663                 */
3664                cancel_delayed_work(&rbd_dev->lock_dwork);
3665
3666        return true;
3667}
3668
3669static void rbd_release_lock_work(struct work_struct *work)
3670{
3671        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3672                                                  unlock_work);
3673
3674        down_write(&rbd_dev->lock_rwsem);
3675        rbd_release_lock(rbd_dev);
3676        up_write(&rbd_dev->lock_rwsem);
3677}
3678
3679static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3680                                     void **p)
3681{
3682        struct rbd_client_id cid = { 0 };
3683
3684        if (struct_v >= 2) {
3685                cid.gid = ceph_decode_64(p);
3686                cid.handle = ceph_decode_64(p);
3687        }
3688
3689        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3690             cid.handle);
3691        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3692                down_write(&rbd_dev->lock_rwsem);
3693                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3694                        /*
3695                         * we already know that the remote client is
3696                         * the owner
3697                         */
3698                        up_write(&rbd_dev->lock_rwsem);
3699                        return;
3700                }
3701
3702                rbd_set_owner_cid(rbd_dev, &cid);
3703                downgrade_write(&rbd_dev->lock_rwsem);
3704        } else {
3705                down_read(&rbd_dev->lock_rwsem);
3706        }
3707
3708        if (!__rbd_is_lock_owner(rbd_dev))
3709                wake_requests(rbd_dev, false);
3710        up_read(&rbd_dev->lock_rwsem);
3711}
3712
3713static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3714                                     void **p)
3715{
3716        struct rbd_client_id cid = { 0 };
3717
3718        if (struct_v >= 2) {
3719                cid.gid = ceph_decode_64(p);
3720                cid.handle = ceph_decode_64(p);
3721        }
3722
3723        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3724             cid.handle);
3725        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3726                down_write(&rbd_dev->lock_rwsem);
3727                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3728                        dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3729                             __func__, rbd_dev, cid.gid, cid.handle,
3730                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3731                        up_write(&rbd_dev->lock_rwsem);
3732                        return;
3733                }
3734
3735                rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3736                downgrade_write(&rbd_dev->lock_rwsem);
3737        } else {
3738                down_read(&rbd_dev->lock_rwsem);
3739        }
3740
3741        if (!__rbd_is_lock_owner(rbd_dev))
3742                wake_requests(rbd_dev, false);
3743        up_read(&rbd_dev->lock_rwsem);
3744}
3745
3746static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3747                                    void **p)
3748{
3749        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3750        struct rbd_client_id cid = { 0 };
3751        bool need_to_send;
3752
3753        if (struct_v >= 2) {
3754                cid.gid = ceph_decode_64(p);
3755                cid.handle = ceph_decode_64(p);
3756        }
3757
3758        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3759             cid.handle);
3760        if (rbd_cid_equal(&cid, &my_cid))
3761                return false;
3762
3763        down_read(&rbd_dev->lock_rwsem);
3764        need_to_send = __rbd_is_lock_owner(rbd_dev);
3765        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3766                if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3767                        dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3768                             rbd_dev);
3769                        queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3770                }
3771        }
3772        up_read(&rbd_dev->lock_rwsem);
3773        return need_to_send;
3774}
3775
3776static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3777                                     u64 notify_id, u64 cookie, s32 *result)
3778{
3779        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780        int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3781        char buf[buf_size];
3782        int ret;
3783
3784        if (result) {
3785                void *p = buf;
3786
3787                /* encode ResponseMessage */
3788                ceph_start_encoding(&p, 1, 1,
3789                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
3790                ceph_encode_32(&p, *result);
3791        } else {
3792                buf_size = 0;
3793        }
3794
3795        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3796                                   &rbd_dev->header_oloc, notify_id, cookie,
3797                                   buf, buf_size);
3798        if (ret)
3799                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3800}
3801
3802static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3803                                   u64 cookie)
3804{
3805        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3807}
3808
3809static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3810                                          u64 notify_id, u64 cookie, s32 result)
3811{
3812        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3813        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3814}
3815
3816static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3817                         u64 notifier_id, void *data, size_t data_len)
3818{
3819        struct rbd_device *rbd_dev = arg;
3820        void *p = data;
3821        void *const end = p + data_len;
3822        u8 struct_v = 0; /* shut up gcc */
3823        u32 len;
3824        u32 notify_op;
3825        int ret;
3826
3827        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3828             __func__, rbd_dev, cookie, notify_id, data_len);
3829        if (data_len) {
3830                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3831                                          &struct_v, &len);
3832                if (ret) {
3833                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3834                                 ret);
3835                        return;
3836                }
3837
3838                notify_op = ceph_decode_32(&p);
3839        } else {
3840                /* legacy notification for header updates */
3841                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3842                len = 0;
3843        }
3844
3845        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3846        switch (notify_op) {
3847        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3848                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3849                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3850                break;
3851        case RBD_NOTIFY_OP_RELEASED_LOCK:
3852                rbd_handle_released_lock(rbd_dev, struct_v, &p);
3853                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3854                break;
3855        case RBD_NOTIFY_OP_REQUEST_LOCK:
3856                if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3857                        /*
3858                         * send ResponseMessage(0) back so the client
3859                         * can detect a missing owner
3860                         */
3861                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
3862                                                      cookie, 0);
3863                else
3864                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3865                break;
3866        case RBD_NOTIFY_OP_HEADER_UPDATE:
3867                ret = rbd_dev_refresh(rbd_dev);
3868                if (ret)
3869                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
3870
3871                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3872                break;
3873        default:
3874                if (rbd_is_lock_owner(rbd_dev))
3875                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
3876                                                      cookie, -EOPNOTSUPP);
3877                else
3878                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3879                break;
3880        }
3881}
3882
3883static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3884
3885static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3886{
3887        struct rbd_device *rbd_dev = arg;
3888
3889        rbd_warn(rbd_dev, "encountered watch error: %d", err);
3890
3891        down_write(&rbd_dev->lock_rwsem);
3892        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3893        up_write(&rbd_dev->lock_rwsem);
3894
3895        mutex_lock(&rbd_dev->watch_mutex);
3896        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3897                __rbd_unregister_watch(rbd_dev);
3898                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3899
3900                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3901        }
3902        mutex_unlock(&rbd_dev->watch_mutex);
3903}
3904
3905/*
3906 * watch_mutex must be locked
3907 */
3908static int __rbd_register_watch(struct rbd_device *rbd_dev)
3909{
3910        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3911        struct ceph_osd_linger_request *handle;
3912
3913        rbd_assert(!rbd_dev->watch_handle);
3914        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3915
3916        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3917                                 &rbd_dev->header_oloc, rbd_watch_cb,
3918                                 rbd_watch_errcb, rbd_dev);
3919        if (IS_ERR(handle))
3920                return PTR_ERR(handle);
3921
3922        rbd_dev->watch_handle = handle;
3923        return 0;
3924}
3925
3926/*
3927 * watch_mutex must be locked
3928 */
3929static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3930{
3931        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3932        int ret;
3933
3934        rbd_assert(rbd_dev->watch_handle);
3935        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3936
3937        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3938        if (ret)
3939                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3940
3941        rbd_dev->watch_handle = NULL;
3942}
3943
3944static int rbd_register_watch(struct rbd_device *rbd_dev)
3945{
3946        int ret;
3947
3948        mutex_lock(&rbd_dev->watch_mutex);
3949        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3950        ret = __rbd_register_watch(rbd_dev);
3951        if (ret)
3952                goto out;
3953
3954        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3955        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3956
3957out:
3958        mutex_unlock(&rbd_dev->watch_mutex);
3959        return ret;
3960}
3961
3962static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3963{
3964        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3965
3966        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3967        cancel_work_sync(&rbd_dev->acquired_lock_work);
3968        cancel_work_sync(&rbd_dev->released_lock_work);
3969        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3970        cancel_work_sync(&rbd_dev->unlock_work);
3971}
3972
3973static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3974{
3975        WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3976        cancel_tasks_sync(rbd_dev);
3977
3978        mutex_lock(&rbd_dev->watch_mutex);
3979        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3980                __rbd_unregister_watch(rbd_dev);
3981        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3982        mutex_unlock(&rbd_dev->watch_mutex);
3983
3984        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3985}
3986
3987static void rbd_reregister_watch(struct work_struct *work)
3988{
3989        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3990                                            struct rbd_device, watch_dwork);
3991        bool was_lock_owner = false;
3992        bool need_to_wake = false;
3993        int ret;
3994
3995        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3996
3997        down_write(&rbd_dev->lock_rwsem);
3998        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3999                was_lock_owner = rbd_release_lock(rbd_dev);
4000
4001        mutex_lock(&rbd_dev->watch_mutex);
4002        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4003                mutex_unlock(&rbd_dev->watch_mutex);
4004                goto out;
4005        }
4006
4007        ret = __rbd_register_watch(rbd_dev);
4008        if (ret) {
4009                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4010                if (ret == -EBLACKLISTED || ret == -ENOENT) {
4011                        set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
4012                        need_to_wake = true;
4013                } else {
4014                        queue_delayed_work(rbd_dev->task_wq,
4015                                           &rbd_dev->watch_dwork,
4016                                           RBD_RETRY_DELAY);
4017                }
4018                mutex_unlock(&rbd_dev->watch_mutex);
4019                goto out;
4020        }
4021
4022        need_to_wake = true;
4023        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4024        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4025        mutex_unlock(&rbd_dev->watch_mutex);
4026
4027        ret = rbd_dev_refresh(rbd_dev);
4028        if (ret)
4029                rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
4030
4031        if (was_lock_owner) {
4032                ret = rbd_try_lock(rbd_dev);
4033                if (ret)
4034                        rbd_warn(rbd_dev, "reregisteration lock failed: %d",
4035                                 ret);
4036        }
4037
4038out:
4039        up_write(&rbd_dev->lock_rwsem);
4040        if (need_to_wake)
4041                wake_requests(rbd_dev, true);
4042}
4043
4044/*
4045 * Synchronous osd object method call.  Returns the number of bytes
4046 * returned in the outbound buffer, or a negative error code.
4047 */
4048static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4049                             const char *object_name,
4050                             const char *class_name,
4051                             const char *method_name,
4052                             const void *outbound,
4053                             size_t outbound_size,
4054                             void *inbound,
4055                             size_t inbound_size)
4056{
4057        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4058        struct rbd_obj_request *obj_request;
4059        struct page **pages;
4060        u32 page_count;
4061        int ret;
4062
4063        /*
4064         * Method calls are ultimately read operations.  The result
4065         * should placed into the inbound buffer provided.  They
4066         * also supply outbound data--parameters for the object
4067         * method.  Currently if this is present it will be a
4068         * snapshot id.
4069         */
4070        page_count = (u32)calc_pages_for(0, inbound_size);
4071        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4072        if (IS_ERR(pages))
4073                return PTR_ERR(pages);
4074
4075        ret = -ENOMEM;
4076        obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
4077                                                        OBJ_REQUEST_PAGES);
4078        if (!obj_request)
4079                goto out;
4080
4081        obj_request->pages = pages;
4082        obj_request->page_count = page_count;
4083
4084        obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4085                                                  obj_request);
4086        if (!obj_request->osd_req)
4087                goto out;
4088
4089        osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
4090                                        class_name, method_name);
4091        if (outbound_size) {
4092                struct ceph_pagelist *pagelist;
4093
4094                pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4095                if (!pagelist)
4096                        goto out;
4097
4098                ceph_pagelist_init(pagelist);
4099                ceph_pagelist_append(pagelist, outbound, outbound_size);
4100                osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4101                                                pagelist);
4102        }
4103        osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4104                                        obj_request->pages, inbound_size,
4105                                        0, false, false);
4106        rbd_osd_req_format_read(obj_request);
4107
4108        ret = rbd_obj_request_submit(osdc, obj_request);
4109        if (ret)
4110                goto out;
4111        ret = rbd_obj_request_wait(obj_request);
4112        if (ret)
4113                goto out;
4114
4115        ret = obj_request->result;
4116        if (ret < 0)
4117                goto out;
4118
4119        rbd_assert(obj_request->xferred < (u64)INT_MAX);
4120        ret = (int)obj_request->xferred;
4121        ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
4122out:
4123        if (obj_request)
4124                rbd_obj_request_put(obj_request);
4125        else
4126                ceph_release_page_vector(pages, page_count);
4127
4128        return ret;
4129}
4130
4131/*
4132 * lock_rwsem must be held for read
4133 */
4134static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4135{
4136        DEFINE_WAIT(wait);
4137
4138        do {
4139                /*
4140                 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4141                 * and cancel_delayed_work() in wake_requests().
4142                 */
4143                dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4144                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4145                prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4146                                          TASK_UNINTERRUPTIBLE);
4147                up_read(&rbd_dev->lock_rwsem);
4148                schedule();
4149                down_read(&rbd_dev->lock_rwsem);
4150        } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4151                 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4152
4153        finish_wait(&rbd_dev->lock_waitq, &wait);
4154}
4155
4156static void rbd_queue_workfn(struct work_struct *work)
4157{
4158        struct request *rq = blk_mq_rq_from_pdu(work);
4159        struct rbd_device *rbd_dev = rq->q->queuedata;
4160        struct rbd_img_request *img_request;
4161        struct ceph_snap_context *snapc = NULL;
4162        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4163        u64 length = blk_rq_bytes(rq);
4164        enum obj_operation_type op_type;
4165        u64 mapping_size;
4166        bool must_be_locked;
4167        int result;
4168
4169        if (rq->cmd_type != REQ_TYPE_FS) {
4170                dout("%s: non-fs request type %d\n", __func__,
4171                        (int) rq->cmd_type);
4172                result = -EIO;
4173                goto err;
4174        }
4175
4176        if (rq->cmd_flags & REQ_DISCARD)
4177                op_type = OBJ_OP_DISCARD;
4178        else if (rq->cmd_flags & REQ_WRITE)
4179                op_type = OBJ_OP_WRITE;
4180        else
4181                op_type = OBJ_OP_READ;
4182
4183        /* Ignore/skip any zero-length requests */
4184
4185        if (!length) {
4186                dout("%s: zero-length request\n", __func__);
4187                result = 0;
4188                goto err_rq;
4189        }
4190
4191        /* Only reads are allowed to a read-only device */
4192
4193        if (op_type != OBJ_OP_READ) {
4194                if (rbd_dev->mapping.read_only) {
4195                        result = -EROFS;
4196                        goto err_rq;
4197                }
4198                rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4199        }
4200
4201        /*
4202         * Quit early if the mapped snapshot no longer exists.  It's
4203         * still possible the snapshot will have disappeared by the
4204         * time our request arrives at the osd, but there's no sense in
4205         * sending it if we already know.
4206         */
4207        if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4208                dout("request for non-existent snapshot");
4209                rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4210                result = -ENXIO;
4211                goto err_rq;
4212        }
4213
4214        if (offset && length > U64_MAX - offset + 1) {
4215                rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4216                         length);
4217                result = -EINVAL;
4218                goto err_rq;    /* Shouldn't happen */
4219        }
4220
4221        blk_mq_start_request(rq);
4222
4223        down_read(&rbd_dev->header_rwsem);
4224        mapping_size = rbd_dev->mapping.size;
4225        if (op_type != OBJ_OP_READ) {
4226                snapc = rbd_dev->header.snapc;
4227                ceph_get_snap_context(snapc);
4228                must_be_locked = rbd_is_lock_supported(rbd_dev);
4229        } else {
4230                must_be_locked = rbd_dev->opts->lock_on_read &&
4231                                        rbd_is_lock_supported(rbd_dev);
4232        }
4233        up_read(&rbd_dev->header_rwsem);
4234
4235        if (offset + length > mapping_size) {
4236                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4237                         length, mapping_size);
4238                result = -EIO;
4239                goto err_rq;
4240        }
4241
4242        if (must_be_locked) {
4243                down_read(&rbd_dev->lock_rwsem);
4244                if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4245                    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4246                        rbd_wait_state_locked(rbd_dev);
4247
4248                WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4249                        !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4250                if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4251                        result = -EBLACKLISTED;
4252                        goto err_unlock;
4253                }
4254        }
4255
4256        img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4257                                             snapc);
4258        if (!img_request) {
4259                result = -ENOMEM;
4260                goto err_unlock;
4261        }
4262        img_request->rq = rq;
4263        snapc = NULL; /* img_request consumes a ref */
4264
4265        if (op_type == OBJ_OP_DISCARD)
4266                result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4267                                              NULL);
4268        else
4269                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4270                                              rq->bio);
4271        if (result)
4272                goto err_img_request;
4273
4274        result = rbd_img_request_submit(img_request);
4275        if (result)
4276                goto err_img_request;
4277
4278        if (must_be_locked)
4279                up_read(&rbd_dev->lock_rwsem);
4280        return;
4281
4282err_img_request:
4283        rbd_img_request_put(img_request);
4284err_unlock:
4285        if (must_be_locked)
4286                up_read(&rbd_dev->lock_rwsem);
4287err_rq:
4288        if (result)
4289                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4290                         obj_op_name(op_type), length, offset, result);
4291        ceph_put_snap_context(snapc);
4292err:
4293        blk_mq_end_request(rq, result);
4294}
4295
4296static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4297                const struct blk_mq_queue_data *bd)
4298{
4299        struct request *rq = bd->rq;
4300        struct work_struct *work = blk_mq_rq_to_pdu(rq);
4301
4302        queue_work(rbd_wq, work);
4303        return BLK_MQ_RQ_QUEUE_OK;
4304}
4305
4306/*
4307 * a queue callback. Makes sure that we don't create a bio that spans across
4308 * multiple osd objects. One exception would be with a single page bios,
4309 * which we handle later at bio_chain_clone_range()
4310 */
4311static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
4312                          struct bio_vec *bvec)
4313{
4314        struct rbd_device *rbd_dev = q->queuedata;
4315        sector_t sector_offset;
4316        sector_t sectors_per_obj;
4317        sector_t obj_sector_offset;
4318        int ret;
4319
4320        /*
4321         * Find how far into its rbd object the partition-relative
4322         * bio start sector is to offset relative to the enclosing
4323         * device.
4324         */
4325        sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
4326        sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
4327        obj_sector_offset = sector_offset & (sectors_per_obj - 1);
4328
4329        /*
4330         * Compute the number of bytes from that offset to the end
4331         * of the object.  Account for what's already used by the bio.
4332         */
4333        ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
4334        if (ret > bmd->bi_size)
4335                ret -= bmd->bi_size;
4336        else
4337                ret = 0;
4338
4339        /*
4340         * Don't send back more than was asked for.  And if the bio
4341         * was empty, let the whole thing through because:  "Note
4342         * that a block device *must* allow a single page to be
4343         * added to an empty bio."
4344         */
4345        rbd_assert(bvec->bv_len <= PAGE_SIZE);
4346        if (ret > (int) bvec->bv_len || !bmd->bi_size)
4347                ret = (int) bvec->bv_len;
4348
4349        return ret;
4350}
4351
4352static void rbd_free_disk(struct rbd_device *rbd_dev)
4353{
4354        struct gendisk *disk = rbd_dev->disk;
4355
4356        if (!disk)
4357                return;
4358
4359        rbd_dev->disk = NULL;
4360        if (disk->flags & GENHD_FL_UP) {
4361                del_gendisk(disk);
4362                if (disk->queue)
4363                        blk_cleanup_queue(disk->queue);
4364                blk_mq_free_tag_set(&rbd_dev->tag_set);
4365        }
4366        put_disk(disk);
4367}
4368
4369static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4370                                const char *object_name,
4371                                u64 offset, u64 length, void *buf)
4372
4373{
4374        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4375        struct rbd_obj_request *obj_request;
4376        struct page **pages = NULL;
4377        u32 page_count;
4378        size_t size;
4379        int ret;
4380
4381        page_count = (u32) calc_pages_for(offset, length);
4382        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4383        if (IS_ERR(pages))
4384                return PTR_ERR(pages);
4385
4386        ret = -ENOMEM;
4387        obj_request = rbd_obj_request_create(object_name, offset, length,
4388                                                        OBJ_REQUEST_PAGES);
4389        if (!obj_request)
4390                goto out;
4391
4392        obj_request->pages = pages;
4393        obj_request->page_count = page_count;
4394
4395        obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4396                                                  obj_request);
4397        if (!obj_request->osd_req)
4398                goto out;
4399
4400        osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4401                                        offset, length, 0, 0);
4402        osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
4403                                        obj_request->pages,
4404                                        obj_request->length,
4405                                        obj_request->offset & ~PAGE_MASK,
4406                                        false, false);
4407        rbd_osd_req_format_read(obj_request);
4408
4409        ret = rbd_obj_request_submit(osdc, obj_request);
4410        if (ret)
4411                goto out;
4412        ret = rbd_obj_request_wait(obj_request);
4413        if (ret)
4414                goto out;
4415
4416        ret = obj_request->result;
4417        if (ret < 0)
4418                goto out;
4419
4420        rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4421        size = (size_t) obj_request->xferred;
4422        ceph_copy_from_page_vector(pages, buf, 0, size);
4423        rbd_assert(size <= (size_t)INT_MAX);
4424        ret = (int)size;
4425out:
4426        if (obj_request)
4427                rbd_obj_request_put(obj_request);
4428        else
4429                ceph_release_page_vector(pages, page_count);
4430
4431        return ret;
4432}
4433
4434/*
4435 * Read the complete header for the given rbd device.  On successful
4436 * return, the rbd_dev->header field will contain up-to-date
4437 * information about the image.
4438 */
4439static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4440{
4441        struct rbd_image_header_ondisk *ondisk = NULL;
4442        u32 snap_count = 0;
4443        u64 names_size = 0;
4444        u32 want_count;
4445        int ret;
4446
4447        /*
4448         * The complete header will include an array of its 64-bit
4449         * snapshot ids, followed by the names of those snapshots as
4450         * a contiguous block of NUL-terminated strings.  Note that
4451         * the number of snapshots could change by the time we read
4452         * it in, in which case we re-read it.
4453         */
4454        do {
4455                size_t size;
4456
4457                kfree(ondisk);
4458
4459                size = sizeof (*ondisk);
4460                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4461                size += names_size;
4462                ondisk = kmalloc(size, GFP_KERNEL);
4463                if (!ondisk)
4464                        return -ENOMEM;
4465
4466                ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
4467                                       0, size, ondisk);
4468                if (ret < 0)
4469                        goto out;
4470                if ((size_t)ret < size) {
4471                        ret = -ENXIO;
4472                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4473                                size, ret);
4474                        goto out;
4475                }
4476                if (!rbd_dev_ondisk_valid(ondisk)) {
4477                        ret = -ENXIO;
4478                        rbd_warn(rbd_dev, "invalid header");
4479                        goto out;
4480                }
4481
4482                names_size = le64_to_cpu(ondisk->snap_names_len);
4483                want_count = snap_count;
4484                snap_count = le32_to_cpu(ondisk->snap_count);
4485        } while (snap_count != want_count);
4486
4487        ret = rbd_header_from_disk(rbd_dev, ondisk);
4488out:
4489        kfree(ondisk);
4490
4491        return ret;
4492}
4493
4494/*
4495 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4496 * has disappeared from the (just updated) snapshot context.
4497 */
4498static void rbd_exists_validate(struct rbd_device *rbd_dev)
4499{
4500        u64 snap_id;
4501
4502        if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4503                return;
4504
4505        snap_id = rbd_dev->spec->snap_id;
4506        if (snap_id == CEPH_NOSNAP)
4507                return;
4508
4509        if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4510                clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4511}
4512
4513static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4514{
4515        sector_t size;
4516
4517        /*
4518         * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4519         * try to update its size.  If REMOVING is set, updating size
4520         * is just useless work since the device can't be opened.
4521         */
4522        if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4523            !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4524                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4525                dout("setting size to %llu sectors", (unsigned long long)size);
4526                set_capacity(rbd_dev->disk, size);
4527                revalidate_disk(rbd_dev->disk);
4528        }
4529}
4530
4531static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4532{
4533        u64 mapping_size;
4534        int ret;
4535
4536        down_write(&rbd_dev->header_rwsem);
4537        mapping_size = rbd_dev->mapping.size;
4538
4539        ret = rbd_dev_header_info(rbd_dev);
4540        if (ret)
4541                goto out;
4542
4543        /*
4544         * If there is a parent, see if it has disappeared due to the
4545         * mapped image getting flattened.
4546         */
4547        if (rbd_dev->parent) {
4548                ret = rbd_dev_v2_parent_info(rbd_dev);
4549                if (ret)
4550                        goto out;
4551        }
4552
4553        if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4554                rbd_dev->mapping.size = rbd_dev->header.image_size;
4555        } else {
4556                /* validate mapped snapshot's EXISTS flag */
4557                rbd_exists_validate(rbd_dev);
4558        }
4559
4560out:
4561        up_write(&rbd_dev->header_rwsem);
4562        if (!ret && mapping_size != rbd_dev->mapping.size)
4563                rbd_dev_update_size(rbd_dev);
4564
4565        return ret;
4566}
4567
4568static int rbd_init_request(void *data, struct request *rq,
4569                unsigned int hctx_idx, unsigned int request_idx,
4570                unsigned int numa_node)
4571{
4572        struct work_struct *work = blk_mq_rq_to_pdu(rq);
4573
4574        INIT_WORK(work, rbd_queue_workfn);
4575        return 0;
4576}
4577
4578static struct blk_mq_ops rbd_mq_ops = {
4579        .queue_rq       = rbd_queue_rq,
4580        .map_queue      = blk_mq_map_queue,
4581        .init_request   = rbd_init_request,
4582};
4583
4584static int rbd_init_disk(struct rbd_device *rbd_dev)
4585{
4586        struct gendisk *disk;
4587        struct request_queue *q;
4588        u64 segment_size;
4589        int err;
4590
4591        /* create gendisk info */
4592        disk = alloc_disk(single_major ?
4593                          (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4594                          RBD_MINORS_PER_MAJOR);
4595        if (!disk)
4596                return -ENOMEM;
4597
4598        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4599                 rbd_dev->dev_id);
4600        disk->major = rbd_dev->major;
4601        disk->first_minor = rbd_dev->minor;
4602        if (single_major)
4603                disk->flags |= GENHD_FL_EXT_DEVT;
4604        disk->fops = &rbd_bd_ops;
4605        disk->private_data = rbd_dev;
4606
4607        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4608        rbd_dev->tag_set.ops = &rbd_mq_ops;
4609        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4610        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4611        rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4612        rbd_dev->tag_set.nr_hw_queues = 1;
4613        rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4614
4615        err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4616        if (err)
4617                goto out_disk;
4618
4619        q = blk_mq_init_queue(&rbd_dev->tag_set);
4620        if (IS_ERR(q)) {
4621                err = PTR_ERR(q);
4622                goto out_tag_set;
4623        }
4624
4625        queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4626        /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4627
4628        /* set io sizes to object size */
4629        segment_size = rbd_obj_bytes(&rbd_dev->header);
4630        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4631        q->limits.max_sectors = queue_max_hw_sectors(q);
4632        blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4633        blk_queue_max_segment_size(q, segment_size);
4634        blk_queue_io_min(q, segment_size);
4635        blk_queue_io_opt(q, segment_size);
4636
4637        /* enable the discard support */
4638        queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4639        q->limits.discard_granularity = segment_size;
4640        q->limits.discard_alignment = segment_size;
4641        q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
4642        q->limits.discard_zeroes_data = 1;
4643
4644        blk_queue_merge_bvec(q, rbd_merge_bvec);
4645
4646        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4647                q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4648
4649        disk->queue = q;
4650
4651        q->queuedata = rbd_dev;
4652
4653        rbd_dev->disk = disk;
4654
4655        return 0;
4656out_tag_set:
4657        blk_mq_free_tag_set(&rbd_dev->tag_set);
4658out_disk:
4659        put_disk(disk);
4660        return err;
4661}
4662
4663/*
4664  sysfs
4665*/
4666
4667static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4668{
4669        return container_of(dev, struct rbd_device, dev);
4670}
4671
4672static ssize_t rbd_size_show(struct device *dev,
4673                             struct device_attribute *attr, char *buf)
4674{
4675        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4676
4677        return sprintf(buf, "%llu\n",
4678                (unsigned long long)rbd_dev->mapping.size);
4679}
4680
4681/*
4682 * Note this shows the features for whatever's mapped, which is not
4683 * necessarily the base image.
4684 */
4685static ssize_t rbd_features_show(struct device *dev,
4686                             struct device_attribute *attr, char *buf)
4687{
4688        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4689
4690        return sprintf(buf, "0x%016llx\n",
4691                        (unsigned long long)rbd_dev->mapping.features);
4692}
4693
4694static ssize_t rbd_major_show(struct device *dev,
4695                              struct device_attribute *attr, char *buf)
4696{
4697        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4698
4699        if (rbd_dev->major)
4700                return sprintf(buf, "%d\n", rbd_dev->major);
4701
4702        return sprintf(buf, "(none)\n");
4703}
4704
4705static ssize_t rbd_minor_show(struct device *dev,
4706                              struct device_attribute *attr, char *buf)
4707{
4708        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4709
4710        return sprintf(buf, "%d\n", rbd_dev->minor);
4711}
4712
4713static ssize_t rbd_client_addr_show(struct device *dev,
4714                                    struct device_attribute *attr, char *buf)
4715{
4716        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4717        struct ceph_entity_addr *client_addr =
4718            ceph_client_addr(rbd_dev->rbd_client->client);
4719
4720        return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4721                       le32_to_cpu(client_addr->nonce));
4722}
4723
4724static ssize_t rbd_client_id_show(struct device *dev,
4725                                  struct device_attribute *attr, char *buf)
4726{
4727        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4728
4729        return sprintf(buf, "client%lld\n",
4730                       ceph_client_gid(rbd_dev->rbd_client->client));
4731}
4732
4733static ssize_t rbd_cluster_fsid_show(struct device *dev,
4734                                     struct device_attribute *attr, char *buf)
4735{
4736        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4737
4738        return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4739}
4740
4741static ssize_t rbd_config_info_show(struct device *dev,
4742                                    struct device_attribute *attr, char *buf)
4743{
4744        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4745
4746        return sprintf(buf, "%s\n", rbd_dev->config_info);
4747}
4748
4749static ssize_t rbd_pool_show(struct device *dev,
4750                             struct device_attribute *attr, char *buf)
4751{
4752        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4753
4754        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4755}
4756
4757static ssize_t rbd_pool_id_show(struct device *dev,
4758                             struct device_attribute *attr, char *buf)
4759{
4760        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4761
4762        return sprintf(buf, "%llu\n",
4763                        (unsigned long long) rbd_dev->spec->pool_id);
4764}
4765
4766static ssize_t rbd_name_show(struct device *dev,
4767                             struct device_attribute *attr, char *buf)
4768{
4769        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4770
4771        if (rbd_dev->spec->image_name)
4772                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4773
4774        return sprintf(buf, "(unknown)\n");
4775}
4776
4777static ssize_t rbd_image_id_show(struct device *dev,
4778                             struct device_attribute *attr, char *buf)
4779{
4780        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4781
4782        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4783}
4784
4785/*
4786 * Shows the name of the currently-mapped snapshot (or
4787 * RBD_SNAP_HEAD_NAME for the base image).
4788 */
4789static ssize_t rbd_snap_show(struct device *dev,
4790                             struct device_attribute *attr,
4791                             char *buf)
4792{
4793        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4794
4795        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4796}
4797
4798static ssize_t rbd_snap_id_show(struct device *dev,
4799                                struct device_attribute *attr, char *buf)
4800{
4801        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4802
4803        return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4804}
4805
4806/*
4807 * For a v2 image, shows the chain of parent images, separated by empty
4808 * lines.  For v1 images or if there is no parent, shows "(no parent
4809 * image)".
4810 */
4811static ssize_t rbd_parent_show(struct device *dev,
4812                               struct device_attribute *attr,
4813                               char *buf)
4814{
4815        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4816        ssize_t count = 0;
4817
4818        if (!rbd_dev->parent)
4819                return sprintf(buf, "(no parent image)\n");
4820
4821        for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4822                struct rbd_spec *spec = rbd_dev->parent_spec;
4823
4824                count += sprintf(&buf[count], "%s"
4825                            "pool_id %llu\npool_name %s\n"
4826                            "image_id %s\nimage_name %s\n"
4827                            "snap_id %llu\nsnap_name %s\n"
4828                            "overlap %llu\n",
4829                            !count ? "" : "\n", /* first? */
4830                            spec->pool_id, spec->pool_name,
4831                            spec->image_id, spec->image_name ?: "(unknown)",
4832                            spec->snap_id, spec->snap_name,
4833                            rbd_dev->parent_overlap);
4834        }
4835
4836        return count;
4837}
4838
4839static ssize_t rbd_image_refresh(struct device *dev,
4840                                 struct device_attribute *attr,
4841                                 const char *buf,
4842                                 size_t size)
4843{
4844        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4845        int ret;
4846
4847        ret = rbd_dev_refresh(rbd_dev);
4848        if (ret)
4849                return ret;
4850
4851        return size;
4852}
4853
4854static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4855static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4856static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4857static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4858static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4859static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4860static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4861static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4862static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4863static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4864static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4865static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4866static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4867static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4868static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4869static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4870
4871static struct attribute *rbd_attrs[] = {
4872        &dev_attr_size.attr,
4873        &dev_attr_features.attr,
4874        &dev_attr_major.attr,
4875        &dev_attr_minor.attr,
4876        &dev_attr_client_addr.attr,
4877        &dev_attr_client_id.attr,
4878        &dev_attr_cluster_fsid.attr,
4879        &dev_attr_config_info.attr,
4880        &dev_attr_pool.attr,
4881        &dev_attr_pool_id.attr,
4882        &dev_attr_name.attr,
4883        &dev_attr_image_id.attr,
4884        &dev_attr_current_snap.attr,
4885        &dev_attr_snap_id.attr,
4886        &dev_attr_parent.attr,
4887        &dev_attr_refresh.attr,
4888        NULL
4889};
4890
4891static struct attribute_group rbd_attr_group = {
4892        .attrs = rbd_attrs,
4893};
4894
4895static const struct attribute_group *rbd_attr_groups[] = {
4896        &rbd_attr_group,
4897        NULL
4898};
4899
4900static void rbd_dev_release(struct device *dev);
4901
4902static struct device_type rbd_device_type = {
4903        .name           = "rbd",
4904        .groups         = rbd_attr_groups,
4905        .release        = rbd_dev_release,
4906};
4907
4908static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4909{
4910        kref_get(&spec->kref);
4911
4912        return spec;
4913}
4914
4915static void rbd_spec_free(struct kref *kref);
4916static void rbd_spec_put(struct rbd_spec *spec)
4917{
4918        if (spec)
4919                kref_put(&spec->kref, rbd_spec_free);
4920}
4921
4922static struct rbd_spec *rbd_spec_alloc(void)
4923{
4924        struct rbd_spec *spec;
4925
4926        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4927        if (!spec)
4928                return NULL;
4929
4930        spec->pool_id = CEPH_NOPOOL;
4931        spec->snap_id = CEPH_NOSNAP;
4932        kref_init(&spec->kref);
4933
4934        return spec;
4935}
4936
4937static void rbd_spec_free(struct kref *kref)
4938{
4939        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4940
4941        kfree(spec->pool_name);
4942        kfree(spec->image_id);
4943        kfree(spec->image_name);
4944        kfree(spec->snap_name);
4945        kfree(spec);
4946}
4947
4948static void rbd_dev_free(struct rbd_device *rbd_dev)
4949{
4950        WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4951        WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4952
4953        ceph_oid_destroy(&rbd_dev->header_oid);
4954        kfree(rbd_dev->config_info);
4955
4956        rbd_put_client(rbd_dev->rbd_client);
4957        rbd_spec_put(rbd_dev->spec);
4958        kfree(rbd_dev->opts);
4959        kfree(rbd_dev);
4960}
4961
4962static void rbd_dev_release(struct device *dev)
4963{
4964        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4965        bool need_put = !!rbd_dev->opts;
4966
4967        if (need_put) {
4968                destroy_workqueue(rbd_dev->task_wq);
4969                ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4970        }
4971
4972        rbd_dev_free(rbd_dev);
4973
4974        /*
4975         * This is racy, but way better than putting module outside of
4976         * the release callback.  The race window is pretty small, so
4977         * doing something similar to dm (dm-builtin.c) is overkill.
4978         */
4979        if (need_put)
4980                module_put(THIS_MODULE);
4981}
4982
4983static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4984                                           struct rbd_spec *spec)
4985{
4986        struct rbd_device *rbd_dev;
4987
4988        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4989        if (!rbd_dev)
4990                return NULL;
4991
4992        spin_lock_init(&rbd_dev->lock);
4993        INIT_LIST_HEAD(&rbd_dev->node);
4994        init_rwsem(&rbd_dev->header_rwsem);
4995
4996        ceph_oid_init(&rbd_dev->header_oid);
4997        ceph_oloc_init(&rbd_dev->header_oloc);
4998
4999        mutex_init(&rbd_dev->watch_mutex);
5000        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5001        INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5002
5003        init_rwsem(&rbd_dev->lock_rwsem);
5004        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5005        INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5006        INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5007        INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5008        INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5009        init_waitqueue_head(&rbd_dev->lock_waitq);
5010
5011        rbd_dev->dev.bus = &rbd_bus_type;
5012        rbd_dev->dev.type = &rbd_device_type;
5013        rbd_dev->dev.parent = &rbd_root_dev;
5014        device_initialize(&rbd_dev->dev);
5015
5016        rbd_dev->rbd_client = rbdc;
5017        rbd_dev->spec = spec;
5018
5019        rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
5020        rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
5021        rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
5022        rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
5023
5024        return rbd_dev;
5025}
5026
5027/*
5028 * Create a mapping rbd_dev.
5029 */
5030static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5031                                         struct rbd_spec *spec,
5032                                         struct rbd_options *opts)
5033{
5034        struct rbd_device *rbd_dev;
5035
5036        rbd_dev = __rbd_dev_create(rbdc, spec);
5037        if (!rbd_dev)
5038                return NULL;
5039
5040        rbd_dev->opts = opts;
5041
5042        /* get an id and fill in device name */
5043        rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5044                                         minor_to_rbd_dev_id(1 << MINORBITS),
5045                                         GFP_KERNEL);
5046        if (rbd_dev->dev_id < 0)
5047                goto fail_rbd_dev;
5048
5049        sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5050        rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5051                                                   rbd_dev->name);
5052        if (!rbd_dev->task_wq)
5053                goto fail_dev_id;
5054
5055        /* we have a ref from do_rbd_add() */
5056        __module_get(THIS_MODULE);
5057
5058        dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5059        return rbd_dev;
5060
5061fail_dev_id:
5062        ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5063fail_rbd_dev:
5064        rbd_dev_free(rbd_dev);
5065        return NULL;
5066}
5067
5068static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5069{
5070        if (rbd_dev)
5071                put_device(&rbd_dev->dev);
5072}
5073
5074/*
5075 * Get the size and object order for an image snapshot, or if
5076 * snap_id is CEPH_NOSNAP, gets this information for the base
5077 * image.
5078 */
5079static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5080                                u8 *order, u64 *snap_size)
5081{
5082        __le64 snapid = cpu_to_le64(snap_id);
5083        int ret;
5084        struct {
5085                u8 order;
5086                __le64 size;
5087        } __attribute__ ((packed)) size_buf = { 0 };
5088
5089        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5090                                "rbd", "get_size",
5091                                &snapid, sizeof (snapid),
5092                                &size_buf, sizeof (size_buf));
5093        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5094        if (ret < 0)
5095                return ret;
5096        if (ret < sizeof (size_buf))
5097                return -ERANGE;
5098
5099        if (order) {
5100                *order = size_buf.order;
5101                dout("  order %u", (unsigned int)*order);
5102        }
5103        *snap_size = le64_to_cpu(size_buf.size);
5104
5105        dout("  snap_id 0x%016llx snap_size = %llu\n",
5106                (unsigned long long)snap_id,
5107                (unsigned long long)*snap_size);
5108
5109        return 0;
5110}
5111
5112static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5113{
5114        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5115                                        &rbd_dev->header.obj_order,
5116                                        &rbd_dev->header.image_size);
5117}
5118
5119static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5120{
5121        void *reply_buf;
5122        int ret;
5123        void *p;
5124
5125        reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5126        if (!reply_buf)
5127                return -ENOMEM;
5128
5129        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5130                                "rbd", "get_object_prefix", NULL, 0,
5131                                reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5132        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5133        if (ret < 0)
5134                goto out;
5135
5136        p = reply_buf;
5137        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5138                                                p + ret, NULL, GFP_NOIO);
5139        ret = 0;
5140
5141        if (IS_ERR(rbd_dev->header.object_prefix)) {
5142                ret = PTR_ERR(rbd_dev->header.object_prefix);
5143                rbd_dev->header.object_prefix = NULL;
5144        } else {
5145                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5146        }
5147out:
5148        kfree(reply_buf);
5149
5150        return ret;
5151}
5152
5153static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5154                u64 *snap_features)
5155{
5156        __le64 snapid = cpu_to_le64(snap_id);
5157        struct {
5158                __le64 features;
5159                __le64 incompat;
5160        } __attribute__ ((packed)) features_buf = { 0 };
5161        u64 unsup;
5162        int ret;
5163
5164        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5165                                "rbd", "get_features",
5166                                &snapid, sizeof (snapid),
5167                                &features_buf, sizeof (features_buf));
5168        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5169        if (ret < 0)
5170                return ret;
5171        if (ret < sizeof (features_buf))
5172                return -ERANGE;
5173
5174        unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5175        if (unsup) {
5176                rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5177                         unsup);
5178                return -ENXIO;
5179        }
5180
5181        *snap_features = le64_to_cpu(features_buf.features);
5182
5183        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5184                (unsigned long long)snap_id,
5185                (unsigned long long)*snap_features,
5186                (unsigned long long)le64_to_cpu(features_buf.incompat));
5187
5188        return 0;
5189}
5190
5191static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5192{
5193        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5194                                                &rbd_dev->header.features);
5195}
5196
5197static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5198{
5199        struct rbd_spec *parent_spec;
5200        size_t size;
5201        void *reply_buf = NULL;
5202        __le64 snapid;
5203        void *p;
5204        void *end;
5205        u64 pool_id;
5206        char *image_id;
5207        u64 snap_id;
5208        u64 overlap;
5209        int ret;
5210
5211        parent_spec = rbd_spec_alloc();
5212        if (!parent_spec)
5213                return -ENOMEM;
5214
5215        size = sizeof (__le64) +                                /* pool_id */
5216                sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
5217                sizeof (__le64) +                               /* snap_id */
5218                sizeof (__le64);                                /* overlap */
5219        reply_buf = kmalloc(size, GFP_KERNEL);
5220        if (!reply_buf) {
5221                ret = -ENOMEM;
5222                goto out_err;
5223        }
5224
5225        snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5226        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5227                                "rbd", "get_parent",
5228                                &snapid, sizeof (snapid),
5229                                reply_buf, size);
5230        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5231        if (ret < 0)
5232                goto out_err;
5233
5234        p = reply_buf;
5235        end = reply_buf + ret;
5236        ret = -ERANGE;
5237        ceph_decode_64_safe(&p, end, pool_id, out_err);
5238        if (pool_id == CEPH_NOPOOL) {
5239                /*
5240                 * Either the parent never existed, or we have
5241                 * record of it but the image got flattened so it no
5242                 * longer has a parent.  When the parent of a
5243                 * layered image disappears we immediately set the
5244                 * overlap to 0.  The effect of this is that all new
5245                 * requests will be treated as if the image had no
5246                 * parent.
5247                 */
5248                if (rbd_dev->parent_overlap) {
5249                        rbd_dev->parent_overlap = 0;
5250                        rbd_dev_parent_put(rbd_dev);
5251                        pr_info("%s: clone image has been flattened\n",
5252                                rbd_dev->disk->disk_name);
5253                }
5254
5255                goto out;       /* No parent?  No problem. */
5256        }
5257
5258        /* The ceph file layout needs to fit pool id in 32 bits */
5259
5260        ret = -EIO;
5261        if (pool_id > (u64)U32_MAX) {
5262                rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5263                        (unsigned long long)pool_id, U32_MAX);
5264                goto out_err;
5265        }
5266
5267        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5268        if (IS_ERR(image_id)) {
5269                ret = PTR_ERR(image_id);
5270                goto out_err;
5271        }
5272        ceph_decode_64_safe(&p, end, snap_id, out_err);
5273        ceph_decode_64_safe(&p, end, overlap, out_err);
5274
5275        /*
5276         * The parent won't change (except when the clone is
5277         * flattened, already handled that).  So we only need to
5278         * record the parent spec we have not already done so.
5279         */
5280        if (!rbd_dev->parent_spec) {
5281                parent_spec->pool_id = pool_id;
5282                parent_spec->image_id = image_id;
5283                parent_spec->snap_id = snap_id;
5284                rbd_dev->parent_spec = parent_spec;
5285                parent_spec = NULL;     /* rbd_dev now owns this */
5286        } else {
5287                kfree(image_id);
5288        }
5289
5290        /*
5291         * We always update the parent overlap.  If it's zero we issue
5292         * a warning, as we will proceed as if there was no parent.
5293         */
5294        if (!overlap) {
5295                if (parent_spec) {
5296                        /* refresh, careful to warn just once */
5297                        if (rbd_dev->parent_overlap)
5298                                rbd_warn(rbd_dev,
5299                                    "clone now standalone (overlap became 0)");
5300                } else {
5301                        /* initial probe */
5302                        rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5303                }
5304        }
5305        rbd_dev->parent_overlap = overlap;
5306
5307out:
5308        ret = 0;
5309out_err:
5310        kfree(reply_buf);
5311        rbd_spec_put(parent_spec);
5312
5313        return ret;
5314}
5315
5316static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5317{
5318        struct {
5319                __le64 stripe_unit;
5320                __le64 stripe_count;
5321        } __attribute__ ((packed)) striping_info_buf = { 0 };
5322        size_t size = sizeof (striping_info_buf);
5323        void *p;
5324        u64 obj_size;
5325        u64 stripe_unit;
5326        u64 stripe_count;
5327        int ret;
5328
5329        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5330                                "rbd", "get_stripe_unit_count", NULL, 0,
5331                                (char *)&striping_info_buf, size);
5332        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5333        if (ret < 0)
5334                return ret;
5335        if (ret < size)
5336                return -ERANGE;
5337
5338        /*
5339         * We don't actually support the "fancy striping" feature
5340         * (STRIPINGV2) yet, but if the striping sizes are the
5341         * defaults the behavior is the same as before.  So find
5342         * out, and only fail if the image has non-default values.
5343         */
5344        ret = -EINVAL;
5345        obj_size = (u64)1 << rbd_dev->header.obj_order;
5346        p = &striping_info_buf;
5347        stripe_unit = ceph_decode_64(&p);
5348        if (stripe_unit != obj_size) {
5349                rbd_warn(rbd_dev, "unsupported stripe unit "
5350                                "(got %llu want %llu)",
5351                                stripe_unit, obj_size);
5352                return -EINVAL;
5353        }
5354        stripe_count = ceph_decode_64(&p);
5355        if (stripe_count != 1) {
5356                rbd_warn(rbd_dev, "unsupported stripe count "
5357                                "(got %llu want 1)", stripe_count);
5358                return -EINVAL;
5359        }
5360        rbd_dev->header.stripe_unit = stripe_unit;
5361        rbd_dev->header.stripe_count = stripe_count;
5362
5363        return 0;
5364}
5365
5366static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5367{
5368        size_t image_id_size;
5369        char *image_id;
5370        void *p;
5371        void *end;
5372        size_t size;
5373        void *reply_buf = NULL;
5374        size_t len = 0;
5375        char *image_name = NULL;
5376        int ret;
5377
5378        rbd_assert(!rbd_dev->spec->image_name);
5379
5380        len = strlen(rbd_dev->spec->image_id);
5381        image_id_size = sizeof (__le32) + len;
5382        image_id = kmalloc(image_id_size, GFP_KERNEL);
5383        if (!image_id)
5384                return NULL;
5385
5386        p = image_id;
5387        end = image_id + image_id_size;
5388        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5389
5390        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5391        reply_buf = kmalloc(size, GFP_KERNEL);
5392        if (!reply_buf)
5393                goto out;
5394
5395        ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
5396                                "rbd", "dir_get_name",
5397                                image_id, image_id_size,
5398                                reply_buf, size);
5399        if (ret < 0)
5400                goto out;
5401        p = reply_buf;
5402        end = reply_buf + ret;
5403
5404        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5405        if (IS_ERR(image_name))
5406                image_name = NULL;
5407        else
5408                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5409out:
5410        kfree(reply_buf);
5411        kfree(image_id);
5412
5413        return image_name;
5414}
5415
5416static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5417{
5418        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5419        const char *snap_name;
5420        u32 which = 0;
5421
5422        /* Skip over names until we find the one we are looking for */
5423
5424        snap_name = rbd_dev->header.snap_names;
5425        while (which < snapc->num_snaps) {
5426                if (!strcmp(name, snap_name))
5427                        return snapc->snaps[which];
5428                snap_name += strlen(snap_name) + 1;
5429                which++;
5430        }
5431        return CEPH_NOSNAP;
5432}
5433
5434static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5435{
5436        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5437        u32 which;
5438        bool found = false;
5439        u64 snap_id;
5440
5441        for (which = 0; !found && which < snapc->num_snaps; which++) {
5442                const char *snap_name;
5443
5444                snap_id = snapc->snaps[which];
5445                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5446                if (IS_ERR(snap_name)) {
5447                        /* ignore no-longer existing snapshots */
5448                        if (PTR_ERR(snap_name) == -ENOENT)
5449                                continue;
5450                        else
5451                                break;
5452                }
5453                found = !strcmp(name, snap_name);
5454                kfree(snap_name);
5455        }
5456        return found ? snap_id : CEPH_NOSNAP;
5457}
5458
5459/*
5460 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5461 * no snapshot by that name is found, or if an error occurs.
5462 */
5463static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5464{
5465        if (rbd_dev->image_format == 1)
5466                return rbd_v1_snap_id_by_name(rbd_dev, name);
5467
5468        return rbd_v2_snap_id_by_name(rbd_dev, name);
5469}
5470
5471/*
5472 * An image being mapped will have everything but the snap id.
5473 */
5474static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5475{
5476        struct rbd_spec *spec = rbd_dev->spec;
5477
5478        rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5479        rbd_assert(spec->image_id && spec->image_name);
5480        rbd_assert(spec->snap_name);
5481
5482        if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5483                u64 snap_id;
5484
5485                snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5486                if (snap_id == CEPH_NOSNAP)
5487                        return -ENOENT;
5488
5489                spec->snap_id = snap_id;
5490        } else {
5491                spec->snap_id = CEPH_NOSNAP;
5492        }
5493
5494        return 0;
5495}
5496
5497/*
5498 * A parent image will have all ids but none of the names.
5499 *
5500 * All names in an rbd spec are dynamically allocated.  It's OK if we
5501 * can't figure out the name for an image id.
5502 */
5503static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5504{
5505        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5506        struct rbd_spec *spec = rbd_dev->spec;
5507        const char *pool_name;
5508        const char *image_name;
5509        const char *snap_name;
5510        int ret;
5511
5512        rbd_assert(spec->pool_id != CEPH_NOPOOL);
5513        rbd_assert(spec->image_id);
5514        rbd_assert(spec->snap_id != CEPH_NOSNAP);
5515
5516        /* Get the pool name; we have to make our own copy of this */
5517
5518        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5519        if (!pool_name) {
5520                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5521                return -EIO;
5522        }
5523        pool_name = kstrdup(pool_name, GFP_KERNEL);
5524        if (!pool_name)
5525                return -ENOMEM;
5526
5527        /* Fetch the image name; tolerate failure here */
5528
5529        image_name = rbd_dev_image_name(rbd_dev);
5530        if (!image_name)
5531                rbd_warn(rbd_dev, "unable to get image name");
5532
5533        /* Fetch the snapshot name */
5534
5535        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5536        if (IS_ERR(snap_name)) {
5537                ret = PTR_ERR(snap_name);
5538                goto out_err;
5539        }
5540
5541        spec->pool_name = pool_name;
5542        spec->image_name = image_name;
5543        spec->snap_name = snap_name;
5544
5545        return 0;
5546
5547out_err:
5548        kfree(image_name);
5549        kfree(pool_name);
5550        return ret;
5551}
5552
5553static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5554{
5555        size_t size;
5556        int ret;
5557        void *reply_buf;
5558        void *p;
5559        void *end;
5560        u64 seq;
5561        u32 snap_count;
5562        struct ceph_snap_context *snapc;
5563        u32 i;
5564
5565        /*
5566         * We'll need room for the seq value (maximum snapshot id),
5567         * snapshot count, and array of that many snapshot ids.
5568         * For now we have a fixed upper limit on the number we're
5569         * prepared to receive.
5570         */
5571        size = sizeof (__le64) + sizeof (__le32) +
5572                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
5573        reply_buf = kzalloc(size, GFP_KERNEL);
5574        if (!reply_buf)
5575                return -ENOMEM;
5576
5577        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5578                                "rbd", "get_snapcontext", NULL, 0,
5579                                reply_buf, size);
5580        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5581        if (ret < 0)
5582                goto out;
5583
5584        p = reply_buf;
5585        end = reply_buf + ret;
5586        ret = -ERANGE;
5587        ceph_decode_64_safe(&p, end, seq, out);
5588        ceph_decode_32_safe(&p, end, snap_count, out);
5589
5590        /*
5591         * Make sure the reported number of snapshot ids wouldn't go
5592         * beyond the end of our buffer.  But before checking that,
5593         * make sure the computed size of the snapshot context we
5594         * allocate is representable in a size_t.
5595         */
5596        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5597                                 / sizeof (u64)) {
5598                ret = -EINVAL;
5599                goto out;
5600        }
5601        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5602                goto out;
5603        ret = 0;
5604
5605        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5606        if (!snapc) {
5607                ret = -ENOMEM;
5608                goto out;
5609        }
5610        snapc->seq = seq;
5611        for (i = 0; i < snap_count; i++)
5612                snapc->snaps[i] = ceph_decode_64(&p);
5613
5614        ceph_put_snap_context(rbd_dev->header.snapc);
5615        rbd_dev->header.snapc = snapc;
5616
5617        dout("  snap context seq = %llu, snap_count = %u\n",
5618                (unsigned long long)seq, (unsigned int)snap_count);
5619out:
5620        kfree(reply_buf);
5621
5622        return ret;
5623}
5624
5625static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5626                                        u64 snap_id)
5627{
5628        size_t size;
5629        void *reply_buf;
5630        __le64 snapid;
5631        int ret;
5632        void *p;
5633        void *end;
5634        char *snap_name;
5635
5636        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5637        reply_buf = kmalloc(size, GFP_KERNEL);
5638        if (!reply_buf)
5639                return ERR_PTR(-ENOMEM);
5640
5641        snapid = cpu_to_le64(snap_id);
5642        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5643                                "rbd", "get_snapshot_name",
5644                                &snapid, sizeof (snapid),
5645                                reply_buf, size);
5646        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5647        if (ret < 0) {
5648                snap_name = ERR_PTR(ret);
5649                goto out;
5650        }
5651
5652        p = reply_buf;
5653        end = reply_buf + ret;
5654        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5655        if (IS_ERR(snap_name))
5656                goto out;
5657
5658        dout("  snap_id 0x%016llx snap_name = %s\n",
5659                (unsigned long long)snap_id, snap_name);
5660out:
5661        kfree(reply_buf);
5662
5663        return snap_name;
5664}
5665
5666static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5667{
5668        bool first_time = rbd_dev->header.object_prefix == NULL;
5669        int ret;
5670
5671        ret = rbd_dev_v2_image_size(rbd_dev);
5672        if (ret)
5673                return ret;
5674
5675        if (first_time) {
5676                ret = rbd_dev_v2_header_onetime(rbd_dev);
5677                if (ret)
5678                        return ret;
5679        }
5680
5681        ret = rbd_dev_v2_snap_context(rbd_dev);
5682        if (ret && first_time) {
5683                kfree(rbd_dev->header.object_prefix);
5684                rbd_dev->header.object_prefix = NULL;
5685        }
5686
5687        return ret;
5688}
5689
5690static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5691{
5692        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5693
5694        if (rbd_dev->image_format == 1)
5695                return rbd_dev_v1_header_info(rbd_dev);
5696
5697        return rbd_dev_v2_header_info(rbd_dev);
5698}
5699
5700/*
5701 * Skips over white space at *buf, and updates *buf to point to the
5702 * first found non-space character (if any). Returns the length of
5703 * the token (string of non-white space characters) found.  Note
5704 * that *buf must be terminated with '\0'.
5705 */
5706static inline size_t next_token(const char **buf)
5707{
5708        /*
5709        * These are the characters that produce nonzero for
5710        * isspace() in the "C" and "POSIX" locales.
5711        */
5712        const char *spaces = " \f\n\r\t\v";
5713
5714        *buf += strspn(*buf, spaces);   /* Find start of token */
5715
5716        return strcspn(*buf, spaces);   /* Return token length */
5717}
5718
5719/*
5720 * Finds the next token in *buf, dynamically allocates a buffer big
5721 * enough to hold a copy of it, and copies the token into the new
5722 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5723 * that a duplicate buffer is created even for a zero-length token.
5724 *
5725 * Returns a pointer to the newly-allocated duplicate, or a null
5726 * pointer if memory for the duplicate was not available.  If
5727 * the lenp argument is a non-null pointer, the length of the token
5728 * (not including the '\0') is returned in *lenp.
5729 *
5730 * If successful, the *buf pointer will be updated to point beyond
5731 * the end of the found token.
5732 *
5733 * Note: uses GFP_KERNEL for allocation.
5734 */
5735static inline char *dup_token(const char **buf, size_t *lenp)
5736{
5737        char *dup;
5738        size_t len;
5739
5740        len = next_token(buf);
5741        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5742        if (!dup)
5743                return NULL;
5744        *(dup + len) = '\0';
5745        *buf += len;
5746
5747        if (lenp)
5748                *lenp = len;
5749
5750        return dup;
5751}
5752
5753/*
5754 * Parse the options provided for an "rbd add" (i.e., rbd image
5755 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5756 * and the data written is passed here via a NUL-terminated buffer.
5757 * Returns 0 if successful or an error code otherwise.
5758 *
5759 * The information extracted from these options is recorded in
5760 * the other parameters which return dynamically-allocated
5761 * structures:
5762 *  ceph_opts
5763 *      The address of a pointer that will refer to a ceph options
5764 *      structure.  Caller must release the returned pointer using
5765 *      ceph_destroy_options() when it is no longer needed.
5766 *  rbd_opts
5767 *      Address of an rbd options pointer.  Fully initialized by
5768 *      this function; caller must release with kfree().
5769 *  spec
5770 *      Address of an rbd image specification pointer.  Fully
5771 *      initialized by this function based on parsed options.
5772 *      Caller must release with rbd_spec_put().
5773 *
5774 * The options passed take this form:
5775 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5776 * where:
5777 *  <mon_addrs>
5778 *      A comma-separated list of one or more monitor addresses.
5779 *      A monitor address is an ip address, optionally followed
5780 *      by a port number (separated by a colon).
5781 *        I.e.:  ip1[:port1][,ip2[:port2]...]
5782 *  <options>
5783 *      A comma-separated list of ceph and/or rbd options.
5784 *  <pool_name>
5785 *      The name of the rados pool containing the rbd image.
5786 *  <image_name>
5787 *      The name of the image in that pool to map.
5788 *  <snap_id>
5789 *      An optional snapshot id.  If provided, the mapping will
5790 *      present data from the image at the time that snapshot was
5791 *      created.  The image head is used if no snapshot id is
5792 *      provided.  Snapshot mappings are always read-only.
5793 */
5794static int rbd_add_parse_args(const char *buf,
5795                                struct ceph_options **ceph_opts,
5796                                struct rbd_options **opts,
5797                                struct rbd_spec **rbd_spec)
5798{
5799        size_t len;
5800        char *options;
5801        const char *mon_addrs;
5802        char *snap_name;
5803        size_t mon_addrs_size;
5804        struct rbd_spec *spec = NULL;
5805        struct rbd_options *rbd_opts = NULL;
5806        struct ceph_options *copts;
5807        int ret;
5808
5809        /* The first four tokens are required */
5810
5811        len = next_token(&buf);
5812        if (!len) {
5813                rbd_warn(NULL, "no monitor address(es) provided");
5814                return -EINVAL;
5815        }
5816        mon_addrs = buf;
5817        mon_addrs_size = len + 1;
5818        buf += len;
5819
5820        ret = -EINVAL;
5821        options = dup_token(&buf, NULL);
5822        if (!options)
5823                return -ENOMEM;
5824        if (!*options) {
5825                rbd_warn(NULL, "no options provided");
5826                goto out_err;
5827        }
5828
5829        spec = rbd_spec_alloc();
5830        if (!spec)
5831                goto out_mem;
5832
5833        spec->pool_name = dup_token(&buf, NULL);
5834        if (!spec->pool_name)
5835                goto out_mem;
5836        if (!*spec->pool_name) {
5837                rbd_warn(NULL, "no pool name provided");
5838                goto out_err;
5839        }
5840
5841        spec->image_name = dup_token(&buf, NULL);
5842        if (!spec->image_name)
5843                goto out_mem;
5844        if (!*spec->image_name) {
5845                rbd_warn(NULL, "no image name provided");
5846                goto out_err;
5847        }
5848
5849        /*
5850         * Snapshot name is optional; default is to use "-"
5851         * (indicating the head/no snapshot).
5852         */
5853        len = next_token(&buf);
5854        if (!len) {
5855                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5856                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5857        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5858                ret = -ENAMETOOLONG;
5859                goto out_err;
5860        }
5861        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5862        if (!snap_name)
5863                goto out_mem;
5864        *(snap_name + len) = '\0';
5865        spec->snap_name = snap_name;
5866
5867        /* Initialize all rbd options to the defaults */
5868
5869        rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5870        if (!rbd_opts)
5871                goto out_mem;
5872
5873        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5874        rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5875        rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5876
5877        copts = ceph_parse_options(options, mon_addrs,
5878                                        mon_addrs + mon_addrs_size - 1,
5879                                        parse_rbd_opts_token, rbd_opts);
5880        if (IS_ERR(copts)) {
5881                ret = PTR_ERR(copts);
5882                goto out_err;
5883        }
5884        kfree(options);
5885
5886        *ceph_opts = copts;
5887        *opts = rbd_opts;
5888        *rbd_spec = spec;
5889
5890        return 0;
5891out_mem:
5892        ret = -ENOMEM;
5893out_err:
5894        kfree(rbd_opts);
5895        rbd_spec_put(spec);
5896        kfree(options);
5897
5898        return ret;
5899}
5900
5901/*
5902 * Return pool id (>= 0) or a negative error code.
5903 */
5904static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5905{
5906        struct ceph_options *opts = rbdc->client->options;
5907        u64 newest_epoch;
5908        int tries = 0;
5909        int ret;
5910
5911again:
5912        ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5913        if (ret == -ENOENT && tries++ < 1) {
5914                ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5915                                            &newest_epoch);
5916                if (ret < 0)
5917                        return ret;
5918
5919                if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5920                        ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5921                        (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5922                                                     newest_epoch,
5923                                                     opts->mount_timeout);
5924                        goto again;
5925                } else {
5926                        /* the osdmap we have is new enough */
5927                        return -ENOENT;
5928                }
5929        }
5930
5931        return ret;
5932}
5933
5934/*
5935 * An rbd format 2 image has a unique identifier, distinct from the
5936 * name given to it by the user.  Internally, that identifier is
5937 * what's used to specify the names of objects related to the image.
5938 *
5939 * A special "rbd id" object is used to map an rbd image name to its
5940 * id.  If that object doesn't exist, then there is no v2 rbd image
5941 * with the supplied name.
5942 *
5943 * This function will record the given rbd_dev's image_id field if
5944 * it can be determined, and in that case will return 0.  If any
5945 * errors occur a negative errno will be returned and the rbd_dev's
5946 * image_id field will be unchanged (and should be NULL).
5947 */
5948static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5949{
5950        int ret;
5951        size_t size;
5952        char *object_name;
5953        void *response;
5954        char *image_id;
5955
5956        /*
5957         * When probing a parent image, the image id is already
5958         * known (and the image name likely is not).  There's no
5959         * need to fetch the image id again in this case.  We
5960         * do still need to set the image format though.
5961         */
5962        if (rbd_dev->spec->image_id) {
5963                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5964
5965                return 0;
5966        }
5967
5968        /*
5969         * First, see if the format 2 image id file exists, and if
5970         * so, get the image's persistent id from it.
5971         */
5972        size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5973        object_name = kmalloc(size, GFP_NOIO);
5974        if (!object_name)
5975                return -ENOMEM;
5976        sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5977        dout("rbd id object name is %s\n", object_name);
5978
5979        /* Response will be an encoded string, which includes a length */
5980
5981        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5982        response = kzalloc(size, GFP_NOIO);
5983        if (!response) {
5984                ret = -ENOMEM;
5985                goto out;
5986        }
5987
5988        /* If it doesn't exist we'll assume it's a format 1 image */
5989
5990        ret = rbd_obj_method_sync(rbd_dev, object_name,
5991                                "rbd", "get_id", NULL, 0,
5992                                response, RBD_IMAGE_ID_LEN_MAX);
5993        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5994        if (ret == -ENOENT) {
5995                image_id = kstrdup("", GFP_KERNEL);
5996                ret = image_id ? 0 : -ENOMEM;
5997                if (!ret)
5998                        rbd_dev->image_format = 1;
5999        } else if (ret >= 0) {
6000                void *p = response;
6001
6002                image_id = ceph_extract_encoded_string(&p, p + ret,
6003                                                NULL, GFP_NOIO);
6004                ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
6005                if (!ret)
6006                        rbd_dev->image_format = 2;
6007        }
6008
6009        if (!ret) {
6010                rbd_dev->spec->image_id = image_id;
6011                dout("image_id is %s\n", image_id);
6012        }
6013out:
6014        kfree(response);
6015        kfree(object_name);
6016
6017        return ret;
6018}
6019
6020/*
6021 * Undo whatever state changes are made by v1 or v2 header info
6022 * call.
6023 */
6024static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6025{
6026        struct rbd_image_header *header;
6027
6028        rbd_dev_parent_put(rbd_dev);
6029
6030        /* Free dynamic fields from the header, then zero it out */
6031
6032        header = &rbd_dev->header;
6033        ceph_put_snap_context(header->snapc);
6034        kfree(header->snap_sizes);
6035        kfree(header->snap_names);
6036        kfree(header->object_prefix);
6037        memset(header, 0, sizeof (*header));
6038}
6039
6040static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6041{
6042        int ret;
6043
6044        ret = rbd_dev_v2_object_prefix(rbd_dev);
6045        if (ret)
6046                goto out_err;
6047
6048        /*
6049         * Get the and check features for the image.  Currently the
6050         * features are assumed to never change.
6051         */
6052        ret = rbd_dev_v2_features(rbd_dev);
6053        if (ret)
6054                goto out_err;
6055
6056        /* If the image supports fancy striping, get its parameters */
6057
6058        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6059                ret = rbd_dev_v2_striping_info(rbd_dev);
6060                if (ret < 0)
6061                        goto out_err;
6062        }
6063        /* No support for crypto and compression type format 2 images */
6064
6065        return 0;
6066out_err:
6067        rbd_dev->header.features = 0;
6068        kfree(rbd_dev->header.object_prefix);
6069        rbd_dev->header.object_prefix = NULL;
6070
6071        return ret;
6072}
6073
6074/*
6075 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6076 * rbd_dev_image_probe() recursion depth, which means it's also the
6077 * length of the already discovered part of the parent chain.
6078 */
6079static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6080{
6081        struct rbd_device *parent = NULL;
6082        int ret;
6083
6084        if (!rbd_dev->parent_spec)
6085                return 0;
6086
6087        if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6088                pr_info("parent chain is too long (%d)\n", depth);
6089                ret = -EINVAL;
6090                goto out_err;
6091        }
6092
6093        parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6094        if (!parent) {
6095                ret = -ENOMEM;
6096                goto out_err;
6097        }
6098
6099        /*
6100         * Images related by parent/child relationships always share
6101         * rbd_client and spec/parent_spec, so bump their refcounts.
6102         */
6103        __rbd_get_client(rbd_dev->rbd_client);
6104        rbd_spec_get(rbd_dev->parent_spec);
6105
6106        ret = rbd_dev_image_probe(parent, depth);
6107        if (ret < 0)
6108                goto out_err;
6109
6110        rbd_dev->parent = parent;
6111        atomic_set(&rbd_dev->parent_ref, 1);
6112        return 0;
6113
6114out_err:
6115        rbd_dev_unparent(rbd_dev);
6116        rbd_dev_destroy(parent);
6117        return ret;
6118}
6119
6120/*
6121 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6122 * upon return.
6123 */
6124static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6125{
6126        int ret;
6127
6128        /* Record our major and minor device numbers. */
6129
6130        if (!single_major) {
6131                ret = register_blkdev(0, rbd_dev->name);
6132                if (ret < 0)
6133                        goto err_out_unlock;
6134
6135                rbd_dev->major = ret;
6136                rbd_dev->minor = 0;
6137        } else {
6138                rbd_dev->major = rbd_major;
6139                rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6140        }
6141
6142        /* Set up the blkdev mapping. */
6143
6144        ret = rbd_init_disk(rbd_dev);
6145        if (ret)
6146                goto err_out_blkdev;
6147
6148        ret = rbd_dev_mapping_set(rbd_dev);
6149        if (ret)
6150                goto err_out_disk;
6151
6152        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6153        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6154
6155        dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6156        ret = device_add(&rbd_dev->dev);
6157        if (ret)
6158                goto err_out_mapping;
6159
6160        /* Everything's ready.  Announce the disk to the world. */
6161
6162        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6163        up_write(&rbd_dev->header_rwsem);
6164
6165        spin_lock(&rbd_dev_list_lock);
6166        list_add_tail(&rbd_dev->node, &rbd_dev_list);
6167        spin_unlock(&rbd_dev_list_lock);
6168
6169        add_disk(rbd_dev->disk);
6170        pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6171                (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6172                rbd_dev->header.features);
6173
6174        return ret;
6175
6176err_out_mapping:
6177        rbd_dev_mapping_clear(rbd_dev);
6178err_out_disk:
6179        rbd_free_disk(rbd_dev);
6180err_out_blkdev:
6181        if (!single_major)
6182                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6183err_out_unlock:
6184        up_write(&rbd_dev->header_rwsem);
6185        return ret;
6186}
6187
6188static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6189{
6190        struct rbd_spec *spec = rbd_dev->spec;
6191        int ret;
6192
6193        /* Record the header object name for this rbd image. */
6194
6195        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6196
6197        rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
6198        if (rbd_dev->image_format == 1)
6199                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6200                                       spec->image_name, RBD_SUFFIX);
6201        else
6202                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6203                                       RBD_HEADER_PREFIX, spec->image_id);
6204
6205        return ret;
6206}
6207
6208static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6209{
6210        rbd_dev_unprobe(rbd_dev);
6211        rbd_dev->image_format = 0;
6212        kfree(rbd_dev->spec->image_id);
6213        rbd_dev->spec->image_id = NULL;
6214
6215        rbd_dev_destroy(rbd_dev);
6216}
6217
6218/*
6219 * Probe for the existence of the header object for the given rbd
6220 * device.  If this image is the one being mapped (i.e., not a
6221 * parent), initiate a watch on its header object before using that
6222 * object to get detailed information about the rbd image.
6223 */
6224static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6225{
6226        int ret;
6227
6228        /*
6229         * Get the id from the image id object.  Unless there's an
6230         * error, rbd_dev->spec->image_id will be filled in with
6231         * a dynamically-allocated string, and rbd_dev->image_format
6232         * will be set to either 1 or 2.
6233         */
6234        ret = rbd_dev_image_id(rbd_dev);
6235        if (ret)
6236                return ret;
6237
6238        ret = rbd_dev_header_name(rbd_dev);
6239        if (ret)
6240                goto err_out_format;
6241
6242        if (!depth) {
6243                ret = rbd_register_watch(rbd_dev);
6244                if (ret) {
6245                        if (ret == -ENOENT)
6246                                pr_info("image %s/%s does not exist\n",
6247                                        rbd_dev->spec->pool_name,
6248                                        rbd_dev->spec->image_name);
6249                        goto err_out_format;
6250                }
6251        }
6252
6253        ret = rbd_dev_header_info(rbd_dev);
6254        if (ret)
6255                goto err_out_watch;
6256
6257        /*
6258         * If this image is the one being mapped, we have pool name and
6259         * id, image name and id, and snap name - need to fill snap id.
6260         * Otherwise this is a parent image, identified by pool, image
6261         * and snap ids - need to fill in names for those ids.
6262         */
6263        if (!depth)
6264                ret = rbd_spec_fill_snap_id(rbd_dev);
6265        else
6266                ret = rbd_spec_fill_names(rbd_dev);
6267        if (ret) {
6268                if (ret == -ENOENT)
6269                        pr_info("snap %s/%s@%s does not exist\n",
6270                                rbd_dev->spec->pool_name,
6271                                rbd_dev->spec->image_name,
6272                                rbd_dev->spec->snap_name);
6273                goto err_out_probe;
6274        }
6275
6276        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6277                ret = rbd_dev_v2_parent_info(rbd_dev);
6278                if (ret)
6279                        goto err_out_probe;
6280
6281                /*
6282                 * Need to warn users if this image is the one being
6283                 * mapped and has a parent.
6284                 */
6285                if (!depth && rbd_dev->parent_spec)
6286                        rbd_warn(rbd_dev,
6287                                 "WARNING: kernel layering is EXPERIMENTAL!");
6288        }
6289
6290        ret = rbd_dev_probe_parent(rbd_dev, depth);
6291        if (ret)
6292                goto err_out_probe;
6293
6294        dout("discovered format %u image, header name is %s\n",
6295                rbd_dev->image_format, rbd_dev->header_oid.name);
6296        return 0;
6297
6298err_out_probe:
6299        rbd_dev_unprobe(rbd_dev);
6300err_out_watch:
6301        if (!depth)
6302                rbd_unregister_watch(rbd_dev);
6303err_out_format:
6304        rbd_dev->image_format = 0;
6305        kfree(rbd_dev->spec->image_id);
6306        rbd_dev->spec->image_id = NULL;
6307        return ret;
6308}
6309
6310static ssize_t do_rbd_add(struct bus_type *bus,
6311                          const char *buf,
6312                          size_t count)
6313{
6314        struct rbd_device *rbd_dev = NULL;
6315        struct ceph_options *ceph_opts = NULL;
6316        struct rbd_options *rbd_opts = NULL;
6317        struct rbd_spec *spec = NULL;
6318        struct rbd_client *rbdc;
6319        bool read_only;
6320        int rc;
6321
6322        if (!try_module_get(THIS_MODULE))
6323                return -ENODEV;
6324
6325        /* parse add command */
6326        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6327        if (rc < 0)
6328                goto out;
6329
6330        rbdc = rbd_get_client(ceph_opts);
6331        if (IS_ERR(rbdc)) {
6332                rc = PTR_ERR(rbdc);
6333                goto err_out_args;
6334        }
6335
6336        /* pick the pool */
6337        rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6338        if (rc < 0) {
6339                if (rc == -ENOENT)
6340                        pr_info("pool %s does not exist\n", spec->pool_name);
6341                goto err_out_client;
6342        }
6343        spec->pool_id = (u64)rc;
6344
6345        /* The ceph file layout needs to fit pool id in 32 bits */
6346
6347        if (spec->pool_id > (u64)U32_MAX) {
6348                rbd_warn(NULL, "pool id too large (%llu > %u)",
6349                                (unsigned long long)spec->pool_id, U32_MAX);
6350                rc = -EIO;
6351                goto err_out_client;
6352        }
6353
6354        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6355        if (!rbd_dev) {
6356                rc = -ENOMEM;
6357                goto err_out_client;
6358        }
6359        rbdc = NULL;            /* rbd_dev now owns this */
6360        spec = NULL;            /* rbd_dev now owns this */
6361        rbd_opts = NULL;        /* rbd_dev now owns this */
6362
6363        rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6364        if (!rbd_dev->config_info) {
6365                rc = -ENOMEM;
6366                goto err_out_rbd_dev;
6367        }
6368
6369        down_write(&rbd_dev->header_rwsem);
6370        rc = rbd_dev_image_probe(rbd_dev, 0);
6371        if (rc < 0) {
6372                up_write(&rbd_dev->header_rwsem);
6373                goto err_out_rbd_dev;
6374        }
6375
6376        /* If we are mapping a snapshot it must be marked read-only */
6377
6378        read_only = rbd_dev->opts->read_only;
6379        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6380                read_only = true;
6381        rbd_dev->mapping.read_only = read_only;
6382
6383        rc = rbd_dev_device_setup(rbd_dev);
6384        if (rc) {
6385                /*
6386                 * rbd_unregister_watch() can't be moved into
6387                 * rbd_dev_image_release() without refactoring, see
6388                 * commit 1f3ef78861ac.
6389                 */
6390                rbd_unregister_watch(rbd_dev);
6391                rbd_dev_image_release(rbd_dev);
6392                goto out;
6393        }
6394
6395        rc = count;
6396out:
6397        module_put(THIS_MODULE);
6398        return rc;
6399
6400err_out_rbd_dev:
6401        rbd_dev_destroy(rbd_dev);
6402err_out_client:
6403        rbd_put_client(rbdc);
6404err_out_args:
6405        rbd_spec_put(spec);
6406        kfree(rbd_opts);
6407        goto out;
6408}
6409
6410static ssize_t rbd_add(struct bus_type *bus,
6411                       const char *buf,
6412                       size_t count)
6413{
6414        if (single_major)
6415                return -EINVAL;
6416
6417        return do_rbd_add(bus, buf, count);
6418}
6419
6420static ssize_t rbd_add_single_major(struct bus_type *bus,
6421                                    const char *buf,
6422                                    size_t count)
6423{
6424        return do_rbd_add(bus, buf, count);
6425}
6426
6427static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6428{
6429        rbd_free_disk(rbd_dev);
6430
6431        spin_lock(&rbd_dev_list_lock);
6432        list_del_init(&rbd_dev->node);
6433        spin_unlock(&rbd_dev_list_lock);
6434
6435        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6436        device_del(&rbd_dev->dev);
6437        rbd_dev_mapping_clear(rbd_dev);
6438        if (!single_major)
6439                unregister_blkdev(rbd_dev->major, rbd_dev->name);
6440}
6441
6442static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6443{
6444        while (rbd_dev->parent) {
6445                struct rbd_device *first = rbd_dev;
6446                struct rbd_device *second = first->parent;
6447                struct rbd_device *third;
6448
6449                /*
6450                 * Follow to the parent with no grandparent and
6451                 * remove it.
6452                 */
6453                while (second && (third = second->parent)) {
6454                        first = second;
6455                        second = third;
6456                }
6457                rbd_assert(second);
6458                rbd_dev_image_release(second);
6459                first->parent = NULL;
6460                first->parent_overlap = 0;
6461
6462                rbd_assert(first->parent_spec);
6463                rbd_spec_put(first->parent_spec);
6464                first->parent_spec = NULL;
6465        }
6466}
6467
6468static ssize_t do_rbd_remove(struct bus_type *bus,
6469                             const char *buf,
6470                             size_t count)
6471{
6472        struct rbd_device *rbd_dev = NULL;
6473        struct list_head *tmp;
6474        int dev_id;
6475        char opt_buf[6];
6476        bool already = false;
6477        bool force = false;
6478        int ret;
6479
6480        dev_id = -1;
6481        opt_buf[0] = '\0';
6482        sscanf(buf, "%d %5s", &dev_id, opt_buf);
6483        if (dev_id < 0) {
6484                pr_err("dev_id out of range\n");
6485                return -EINVAL;
6486        }
6487        if (opt_buf[0] != '\0') {
6488                if (!strcmp(opt_buf, "force")) {
6489                        force = true;
6490                } else {
6491                        pr_err("bad remove option at '%s'\n", opt_buf);
6492                        return -EINVAL;
6493                }
6494        }
6495
6496        ret = -ENOENT;
6497        spin_lock(&rbd_dev_list_lock);
6498        list_for_each(tmp, &rbd_dev_list) {
6499                rbd_dev = list_entry(tmp, struct rbd_device, node);
6500                if (rbd_dev->dev_id == dev_id) {
6501                        ret = 0;
6502                        break;
6503                }
6504        }
6505        if (!ret) {
6506                spin_lock_irq(&rbd_dev->lock);
6507                if (rbd_dev->open_count && !force)
6508                        ret = -EBUSY;
6509                else
6510                        already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6511                                                        &rbd_dev->flags);
6512                spin_unlock_irq(&rbd_dev->lock);
6513        }
6514        spin_unlock(&rbd_dev_list_lock);
6515        if (ret < 0 || already)
6516                return ret;
6517
6518        if (force) {
6519                /*
6520                 * Prevent new IO from being queued and wait for existing
6521                 * IO to complete/fail.
6522                 */
6523                blk_mq_freeze_queue(rbd_dev->disk->queue);
6524                blk_set_queue_dying(rbd_dev->disk->queue);
6525        }
6526
6527        down_write(&rbd_dev->lock_rwsem);
6528        if (__rbd_is_lock_owner(rbd_dev))
6529                rbd_unlock(rbd_dev);
6530        up_write(&rbd_dev->lock_rwsem);
6531        rbd_unregister_watch(rbd_dev);
6532
6533        /*
6534         * Don't free anything from rbd_dev->disk until after all
6535         * notifies are completely processed. Otherwise
6536         * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6537         * in a potential use after free of rbd_dev->disk or rbd_dev.
6538         */
6539        rbd_dev_device_release(rbd_dev);
6540        rbd_dev_image_release(rbd_dev);
6541
6542        return count;
6543}
6544
6545static ssize_t rbd_remove(struct bus_type *bus,
6546                          const char *buf,
6547                          size_t count)
6548{
6549        if (single_major)
6550                return -EINVAL;
6551
6552        return do_rbd_remove(bus, buf, count);
6553}
6554
6555static ssize_t rbd_remove_single_major(struct bus_type *bus,
6556                                       const char *buf,
6557                                       size_t count)
6558{
6559        return do_rbd_remove(bus, buf, count);
6560}
6561
6562/*
6563 * create control files in sysfs
6564 * /sys/bus/rbd/...
6565 */
6566static int rbd_sysfs_init(void)
6567{
6568        int ret;
6569
6570        ret = device_register(&rbd_root_dev);
6571        if (ret < 0)
6572                return ret;
6573
6574        ret = bus_register(&rbd_bus_type);
6575        if (ret < 0)
6576                device_unregister(&rbd_root_dev);
6577
6578        return ret;
6579}
6580
6581static void rbd_sysfs_cleanup(void)
6582{
6583        bus_unregister(&rbd_bus_type);
6584        device_unregister(&rbd_root_dev);
6585}
6586
6587static int rbd_slab_init(void)
6588{
6589        rbd_assert(!rbd_img_request_cache);
6590        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6591        if (!rbd_img_request_cache)
6592                return -ENOMEM;
6593
6594        rbd_assert(!rbd_obj_request_cache);
6595        rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6596        if (!rbd_obj_request_cache)
6597                goto out_err;
6598
6599        rbd_assert(!rbd_segment_name_cache);
6600        rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
6601                                        CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
6602        if (rbd_segment_name_cache)
6603                return 0;
6604out_err:
6605        kmem_cache_destroy(rbd_obj_request_cache);
6606        rbd_obj_request_cache = NULL;
6607
6608        kmem_cache_destroy(rbd_img_request_cache);
6609        rbd_img_request_cache = NULL;
6610
6611        return -ENOMEM;
6612}
6613
6614static void rbd_slab_exit(void)
6615{
6616        rbd_assert(rbd_segment_name_cache);
6617        kmem_cache_destroy(rbd_segment_name_cache);
6618        rbd_segment_name_cache = NULL;
6619
6620        rbd_assert(rbd_obj_request_cache);
6621        kmem_cache_destroy(rbd_obj_request_cache);
6622        rbd_obj_request_cache = NULL;
6623
6624        rbd_assert(rbd_img_request_cache);
6625        kmem_cache_destroy(rbd_img_request_cache);
6626        rbd_img_request_cache = NULL;
6627}
6628
6629static int __init rbd_init(void)
6630{
6631        int rc;
6632
6633        if (!libceph_compatible(NULL)) {
6634                rbd_warn(NULL, "libceph incompatibility (quitting)");
6635                return -EINVAL;
6636        }
6637
6638        rc = rbd_slab_init();
6639        if (rc)
6640                return rc;
6641
6642        /*
6643         * The number of active work items is limited by the number of
6644         * rbd devices * queue depth, so leave @max_active at default.
6645         */
6646        rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6647        if (!rbd_wq) {
6648                rc = -ENOMEM;
6649                goto err_out_slab;
6650        }
6651
6652        rbd_bus_type.bus_attrs = rbd_bus_attrs;
6653        if (single_major) {
6654                rbd_major = register_blkdev(0, RBD_DRV_NAME);
6655                if (rbd_major < 0) {
6656                        rc = rbd_major;
6657                        goto err_out_wq;
6658                }
6659                rbd_bus_type.bus_attrs = rbd_bus_attrs_single_major;
6660        }
6661
6662        rc = rbd_sysfs_init();
6663        if (rc)
6664                goto err_out_blkdev;
6665
6666        if (single_major)
6667                pr_info("loaded (major %d)\n", rbd_major);
6668        else
6669                pr_info("loaded\n");
6670
6671        return 0;
6672
6673err_out_blkdev:
6674        if (single_major)
6675                unregister_blkdev(rbd_major, RBD_DRV_NAME);
6676err_out_wq:
6677        destroy_workqueue(rbd_wq);
6678err_out_slab:
6679        rbd_slab_exit();
6680        return rc;
6681}
6682
6683static void __exit rbd_exit(void)
6684{
6685        ida_destroy(&rbd_dev_id_ida);
6686        rbd_sysfs_cleanup();
6687        if (single_major)
6688                unregister_blkdev(rbd_major, RBD_DRV_NAME);
6689        destroy_workqueue(rbd_wq);
6690        rbd_slab_exit();
6691}
6692
6693module_init(rbd_init);
6694module_exit(rbd_exit);
6695
6696MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6697MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6698MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6699/* following authorship retained from original osdblk.c */
6700MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6701
6702MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6703MODULE_LICENSE("GPL");
6704