linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/decode.h>
  35#include <linux/parser.h>
  36#include <linux/bsearch.h>
  37
  38#include <linux/kernel.h>
  39#include <linux/device.h>
  40#include <linux/module.h>
  41#include <linux/fs.h>
  42#include <linux/blkdev.h>
  43#include <linux/slab.h>
  44
  45#include "rbd_types.h"
  46
  47#define RBD_DEBUG       /* Activate rbd_assert() calls */
  48
  49/*
  50 * The basic unit of block I/O is a sector.  It is interpreted in a
  51 * number of contexts in Linux (blk, bio, genhd), but the default is
  52 * universally 512 bytes.  These symbols are just slightly more
  53 * meaningful than the bare numbers they represent.
  54 */
  55#define SECTOR_SHIFT    9
  56#define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
  57
  58/*
  59 * Increment the given counter and return its updated value.
  60 * If the counter is already 0 it will not be incremented.
  61 * If the counter is already at its maximum value returns
  62 * -EINVAL without updating it.
  63 */
  64static int atomic_inc_return_safe(atomic_t *v)
  65{
  66        unsigned int counter;
  67
  68        counter = (unsigned int)__atomic_add_unless(v, 1, 0);
  69        if (counter <= (unsigned int)INT_MAX)
  70                return (int)counter;
  71
  72        atomic_dec(v);
  73
  74        return -EINVAL;
  75}
  76
  77/* Decrement the counter.  Return the resulting value, or -EINVAL */
  78static int atomic_dec_return_safe(atomic_t *v)
  79{
  80        int counter;
  81
  82        counter = atomic_dec_return(v);
  83        if (counter >= 0)
  84                return counter;
  85
  86        atomic_inc(v);
  87
  88        return -EINVAL;
  89}
  90
  91#define RBD_DRV_NAME "rbd"
  92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
  93
  94#define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
  95
  96#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  97#define RBD_MAX_SNAP_NAME_LEN   \
  98                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  99
 100#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
 101
 102#define RBD_SNAP_HEAD_NAME      "-"
 103
 104#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 105
 106/* This allows a single page to hold an image name sent by OSD */
 107#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 108#define RBD_IMAGE_ID_LEN_MAX    64
 109
 110#define RBD_OBJ_PREFIX_LEN_MAX  64
 111
 112/* Feature bits */
 113
 114#define RBD_FEATURE_LAYERING    (1<<0)
 115#define RBD_FEATURE_STRIPINGV2  (1<<1)
 116#define RBD_FEATURES_ALL \
 117            (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
 118
 119/* Features supported by this (client software) implementation. */
 120
 121#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 122
 123/*
 124 * An RBD device name will be "rbd#", where the "rbd" comes from
 125 * RBD_DRV_NAME above, and # is a unique integer identifier.
 126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
 127 * enough to hold all possible device names.
 128 */
 129#define DEV_NAME_LEN            32
 130#define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
 131
 132/*
 133 * block device image metadata (in-memory version)
 134 */
 135struct rbd_image_header {
 136        /* These six fields never change for a given rbd image */
 137        char *object_prefix;
 138        __u8 obj_order;
 139        __u8 crypt_type;
 140        __u8 comp_type;
 141        u64 stripe_unit;
 142        u64 stripe_count;
 143        u64 features;           /* Might be changeable someday? */
 144
 145        /* The remaining fields need to be updated occasionally */
 146        u64 image_size;
 147        struct ceph_snap_context *snapc;
 148        char *snap_names;       /* format 1 only */
 149        u64 *snap_sizes;        /* format 1 only */
 150};
 151
 152/*
 153 * An rbd image specification.
 154 *
 155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 156 * identify an image.  Each rbd_dev structure includes a pointer to
 157 * an rbd_spec structure that encapsulates this identity.
 158 *
 159 * Each of the id's in an rbd_spec has an associated name.  For a
 160 * user-mapped image, the names are supplied and the id's associated
 161 * with them are looked up.  For a layered image, a parent image is
 162 * defined by the tuple, and the names are looked up.
 163 *
 164 * An rbd_dev structure contains a parent_spec pointer which is
 165 * non-null if the image it represents is a child in a layered
 166 * image.  This pointer will refer to the rbd_spec structure used
 167 * by the parent rbd_dev for its own identity (i.e., the structure
 168 * is shared between the parent and child).
 169 *
 170 * Since these structures are populated once, during the discovery
 171 * phase of image construction, they are effectively immutable so
 172 * we make no effort to synchronize access to them.
 173 *
 174 * Note that code herein does not assume the image name is known (it
 175 * could be a null pointer).
 176 */
 177struct rbd_spec {
 178        u64             pool_id;
 179        const char      *pool_name;
 180
 181        const char      *image_id;
 182        const char      *image_name;
 183
 184        u64             snap_id;
 185        const char      *snap_name;
 186
 187        struct kref     kref;
 188};
 189
 190/*
 191 * an instance of the client.  multiple devices may share an rbd client.
 192 */
 193struct rbd_client {
 194        struct ceph_client      *client;
 195        struct kref             kref;
 196        struct list_head        node;
 197};
 198
 199struct rbd_img_request;
 200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
 201
 202#define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
 203
 204struct rbd_obj_request;
 205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 206
 207enum obj_request_type {
 208        OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 209};
 210
 211enum obj_req_flags {
 212        OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
 213        OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
 214        OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
 215        OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
 216};
 217
 218struct rbd_obj_request {
 219        const char              *object_name;
 220        u64                     offset;         /* object start byte */
 221        u64                     length;         /* bytes from offset */
 222        unsigned long           flags;
 223
 224        /*
 225         * An object request associated with an image will have its
 226         * img_data flag set; a standalone object request will not.
 227         *
 228         * A standalone object request will have which == BAD_WHICH
 229         * and a null obj_request pointer.
 230         *
 231         * An object request initiated in support of a layered image
 232         * object (to check for its existence before a write) will
 233         * have which == BAD_WHICH and a non-null obj_request pointer.
 234         *
 235         * Finally, an object request for rbd image data will have
 236         * which != BAD_WHICH, and will have a non-null img_request
 237         * pointer.  The value of which will be in the range
 238         * 0..(img_request->obj_request_count-1).
 239         */
 240        union {
 241                struct rbd_obj_request  *obj_request;   /* STAT op */
 242                struct {
 243                        struct rbd_img_request  *img_request;
 244                        u64                     img_offset;
 245                        /* links for img_request->obj_requests list */
 246                        struct list_head        links;
 247                };
 248        };
 249        u32                     which;          /* posn image request list */
 250
 251        enum obj_request_type   type;
 252        union {
 253                struct bio      *bio_list;
 254                struct {
 255                        struct page     **pages;
 256                        u32             page_count;
 257                };
 258        };
 259        struct page             **copyup_pages;
 260        u32                     copyup_page_count;
 261
 262        struct ceph_osd_request *osd_req;
 263
 264        u64                     xferred;        /* bytes transferred */
 265        int                     result;
 266
 267        rbd_obj_callback_t      callback;
 268        struct completion       completion;
 269
 270        struct kref             kref;
 271};
 272
 273enum img_req_flags {
 274        IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
 275        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 276        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 277};
 278
 279struct rbd_img_request {
 280        struct rbd_device       *rbd_dev;
 281        u64                     offset; /* starting image byte offset */
 282        u64                     length; /* byte count from offset */
 283        unsigned long           flags;
 284        union {
 285                u64                     snap_id;        /* for reads */
 286                struct ceph_snap_context *snapc;        /* for writes */
 287        };
 288        union {
 289                struct request          *rq;            /* block request */
 290                struct rbd_obj_request  *obj_request;   /* obj req initiator */
 291        };
 292        struct page             **copyup_pages;
 293        u32                     copyup_page_count;
 294        spinlock_t              completion_lock;/* protects next_completion */
 295        u32                     next_completion;
 296        rbd_img_callback_t      callback;
 297        u64                     xferred;/* aggregate bytes transferred */
 298        int                     result; /* first nonzero obj_request result */
 299
 300        u32                     obj_request_count;
 301        struct list_head        obj_requests;   /* rbd_obj_request structs */
 302
 303        struct kref             kref;
 304};
 305
 306#define for_each_obj_request(ireq, oreq) \
 307        list_for_each_entry(oreq, &(ireq)->obj_requests, links)
 308#define for_each_obj_request_from(ireq, oreq) \
 309        list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
 310#define for_each_obj_request_safe(ireq, oreq, n) \
 311        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 312
 313struct rbd_mapping {
 314        u64                     size;
 315        u64                     features;
 316        bool                    read_only;
 317};
 318
 319/*
 320 * a single device
 321 */
 322struct rbd_device {
 323        int                     dev_id;         /* blkdev unique id */
 324
 325        int                     major;          /* blkdev assigned major */
 326        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 327
 328        u32                     image_format;   /* Either 1 or 2 */
 329        struct rbd_client       *rbd_client;
 330
 331        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 332
 333        spinlock_t              lock;           /* queue, flags, open_count */
 334
 335        struct rbd_image_header header;
 336        unsigned long           flags;          /* possibly lock protected */
 337        struct rbd_spec         *spec;
 338
 339        char                    *header_name;
 340
 341        struct ceph_file_layout layout;
 342
 343        struct ceph_osd_event   *watch_event;
 344        struct rbd_obj_request  *watch_request;
 345
 346        struct rbd_spec         *parent_spec;
 347        u64                     parent_overlap;
 348        atomic_t                parent_ref;
 349        struct rbd_device       *parent;
 350
 351        /* protects updating the header */
 352        struct rw_semaphore     header_rwsem;
 353
 354        struct rbd_mapping      mapping;
 355
 356        struct list_head        node;
 357
 358        /* sysfs related */
 359        struct device           dev;
 360        unsigned long           open_count;     /* protected by lock */
 361};
 362
 363/*
 364 * Flag bits for rbd_dev->flags.  If atomicity is required,
 365 * rbd_dev->lock is used to protect access.
 366 *
 367 * Currently, only the "removing" flag (which is coupled with the
 368 * "open_count" field) requires atomic access.
 369 */
 370enum rbd_dev_flags {
 371        RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
 372        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 373};
 374
 375static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 376
 377static LIST_HEAD(rbd_dev_list);    /* devices */
 378static DEFINE_SPINLOCK(rbd_dev_list_lock);
 379
 380static LIST_HEAD(rbd_client_list);              /* clients */
 381static DEFINE_SPINLOCK(rbd_client_list_lock);
 382
 383/* Slab caches for frequently-allocated structures */
 384
 385static struct kmem_cache        *rbd_img_request_cache;
 386static struct kmem_cache        *rbd_obj_request_cache;
 387static struct kmem_cache        *rbd_segment_name_cache;
 388
 389static int rbd_img_request_submit(struct rbd_img_request *img_request);
 390
 391static void rbd_dev_device_release(struct device *dev);
 392
 393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 394                       size_t count);
 395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 396                          size_t count);
 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 398static void rbd_spec_put(struct rbd_spec *spec);
 399
 400static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
 401static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
 402
 403static struct attribute *rbd_bus_attrs[] = {
 404        &bus_attr_add.attr,
 405        &bus_attr_remove.attr,
 406        NULL,
 407};
 408ATTRIBUTE_GROUPS(rbd_bus);
 409
 410static struct bus_type rbd_bus_type = {
 411        .name           = "rbd",
 412        .bus_groups     = rbd_bus_groups,
 413};
 414
 415static void rbd_root_dev_release(struct device *dev)
 416{
 417}
 418
 419static struct device rbd_root_dev = {
 420        .init_name =    "rbd",
 421        .release =      rbd_root_dev_release,
 422};
 423
 424static __printf(2, 3)
 425void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 426{
 427        struct va_format vaf;
 428        va_list args;
 429
 430        va_start(args, fmt);
 431        vaf.fmt = fmt;
 432        vaf.va = &args;
 433
 434        if (!rbd_dev)
 435                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 436        else if (rbd_dev->disk)
 437                printk(KERN_WARNING "%s: %s: %pV\n",
 438                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 439        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 440                printk(KERN_WARNING "%s: image %s: %pV\n",
 441                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 442        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 443                printk(KERN_WARNING "%s: id %s: %pV\n",
 444                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 445        else    /* punt */
 446                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 447                        RBD_DRV_NAME, rbd_dev, &vaf);
 448        va_end(args);
 449}
 450
 451#ifdef RBD_DEBUG
 452#define rbd_assert(expr)                                                \
 453                if (unlikely(!(expr))) {                                \
 454                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 455                                                "at line %d:\n\n"       \
 456                                        "\trbd_assert(%s);\n\n",        \
 457                                        __func__, __LINE__, #expr);     \
 458                        BUG();                                          \
 459                }
 460#else /* !RBD_DEBUG */
 461#  define rbd_assert(expr)      ((void) 0)
 462#endif /* !RBD_DEBUG */
 463
 464static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
 465static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 466static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 467
 468static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 469static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 470static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
 471static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 472                                        u64 snap_id);
 473static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 474                                u8 *order, u64 *snap_size);
 475static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 476                u64 *snap_features);
 477static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
 478
 479static int rbd_open(struct block_device *bdev, fmode_t mode)
 480{
 481        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 482        bool removing = false;
 483
 484        if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
 485                return -EROFS;
 486
 487        spin_lock_irq(&rbd_dev->lock);
 488        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 489                removing = true;
 490        else
 491                rbd_dev->open_count++;
 492        spin_unlock_irq(&rbd_dev->lock);
 493        if (removing)
 494                return -ENOENT;
 495
 496        (void) get_device(&rbd_dev->dev);
 497        set_device_ro(bdev, rbd_dev->mapping.read_only);
 498
 499        return 0;
 500}
 501
 502static void rbd_release(struct gendisk *disk, fmode_t mode)
 503{
 504        struct rbd_device *rbd_dev = disk->private_data;
 505        unsigned long open_count_before;
 506
 507        spin_lock_irq(&rbd_dev->lock);
 508        open_count_before = rbd_dev->open_count--;
 509        spin_unlock_irq(&rbd_dev->lock);
 510        rbd_assert(open_count_before > 0);
 511
 512        put_device(&rbd_dev->dev);
 513}
 514
 515static const struct block_device_operations rbd_bd_ops = {
 516        .owner                  = THIS_MODULE,
 517        .open                   = rbd_open,
 518        .release                = rbd_release,
 519};
 520
 521/*
 522 * Initialize an rbd client instance.  Success or not, this function
 523 * consumes ceph_opts.  Caller holds client_mutex.
 524 */
 525static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 526{
 527        struct rbd_client *rbdc;
 528        int ret = -ENOMEM;
 529
 530        dout("%s:\n", __func__);
 531        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 532        if (!rbdc)
 533                goto out_opt;
 534
 535        kref_init(&rbdc->kref);
 536        INIT_LIST_HEAD(&rbdc->node);
 537
 538        rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
 539        if (IS_ERR(rbdc->client))
 540                goto out_rbdc;
 541        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 542
 543        ret = ceph_open_session(rbdc->client);
 544        if (ret < 0)
 545                goto out_client;
 546
 547        spin_lock(&rbd_client_list_lock);
 548        list_add_tail(&rbdc->node, &rbd_client_list);
 549        spin_unlock(&rbd_client_list_lock);
 550
 551        dout("%s: rbdc %p\n", __func__, rbdc);
 552
 553        return rbdc;
 554out_client:
 555        ceph_destroy_client(rbdc->client);
 556out_rbdc:
 557        kfree(rbdc);
 558out_opt:
 559        if (ceph_opts)
 560                ceph_destroy_options(ceph_opts);
 561        dout("%s: error %d\n", __func__, ret);
 562
 563        return ERR_PTR(ret);
 564}
 565
 566static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 567{
 568        kref_get(&rbdc->kref);
 569
 570        return rbdc;
 571}
 572
 573/*
 574 * Find a ceph client with specific addr and configuration.  If
 575 * found, bump its reference count.
 576 */
 577static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 578{
 579        struct rbd_client *client_node;
 580        bool found = false;
 581
 582        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 583                return NULL;
 584
 585        spin_lock(&rbd_client_list_lock);
 586        list_for_each_entry(client_node, &rbd_client_list, node) {
 587                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 588                        __rbd_get_client(client_node);
 589
 590                        found = true;
 591                        break;
 592                }
 593        }
 594        spin_unlock(&rbd_client_list_lock);
 595
 596        return found ? client_node : NULL;
 597}
 598
 599/*
 600 * mount options
 601 */
 602enum {
 603        Opt_last_int,
 604        /* int args above */
 605        Opt_last_string,
 606        /* string args above */
 607        Opt_read_only,
 608        Opt_read_write,
 609        /* Boolean args above */
 610        Opt_last_bool,
 611};
 612
 613static match_table_t rbd_opts_tokens = {
 614        /* int args above */
 615        /* string args above */
 616        {Opt_read_only, "read_only"},
 617        {Opt_read_only, "ro"},          /* Alternate spelling */
 618        {Opt_read_write, "read_write"},
 619        {Opt_read_write, "rw"},         /* Alternate spelling */
 620        /* Boolean args above */
 621        {-1, NULL}
 622};
 623
 624struct rbd_options {
 625        bool    read_only;
 626};
 627
 628#define RBD_READ_ONLY_DEFAULT   false
 629
 630static int parse_rbd_opts_token(char *c, void *private)
 631{
 632        struct rbd_options *rbd_opts = private;
 633        substring_t argstr[MAX_OPT_ARGS];
 634        int token, intval, ret;
 635
 636        token = match_token(c, rbd_opts_tokens, argstr);
 637        if (token < 0)
 638                return -EINVAL;
 639
 640        if (token < Opt_last_int) {
 641                ret = match_int(&argstr[0], &intval);
 642                if (ret < 0) {
 643                        pr_err("bad mount option arg (not int) "
 644                               "at '%s'\n", c);
 645                        return ret;
 646                }
 647                dout("got int token %d val %d\n", token, intval);
 648        } else if (token > Opt_last_int && token < Opt_last_string) {
 649                dout("got string token %d val %s\n", token,
 650                     argstr[0].from);
 651        } else if (token > Opt_last_string && token < Opt_last_bool) {
 652                dout("got Boolean token %d\n", token);
 653        } else {
 654                dout("got token %d\n", token);
 655        }
 656
 657        switch (token) {
 658        case Opt_read_only:
 659                rbd_opts->read_only = true;
 660                break;
 661        case Opt_read_write:
 662                rbd_opts->read_only = false;
 663                break;
 664        default:
 665                rbd_assert(false);
 666                break;
 667        }
 668        return 0;
 669}
 670
 671/*
 672 * Get a ceph client with specific addr and configuration, if one does
 673 * not exist create it.  Either way, ceph_opts is consumed by this
 674 * function.
 675 */
 676static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 677{
 678        struct rbd_client *rbdc;
 679
 680        mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
 681        rbdc = rbd_client_find(ceph_opts);
 682        if (rbdc)       /* using an existing client */
 683                ceph_destroy_options(ceph_opts);
 684        else
 685                rbdc = rbd_client_create(ceph_opts);
 686        mutex_unlock(&client_mutex);
 687
 688        return rbdc;
 689}
 690
 691/*
 692 * Destroy ceph client
 693 *
 694 * Caller must hold rbd_client_list_lock.
 695 */
 696static void rbd_client_release(struct kref *kref)
 697{
 698        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 699
 700        dout("%s: rbdc %p\n", __func__, rbdc);
 701        spin_lock(&rbd_client_list_lock);
 702        list_del(&rbdc->node);
 703        spin_unlock(&rbd_client_list_lock);
 704
 705        ceph_destroy_client(rbdc->client);
 706        kfree(rbdc);
 707}
 708
 709/*
 710 * Drop reference to ceph client node. If it's not referenced anymore, release
 711 * it.
 712 */
 713static void rbd_put_client(struct rbd_client *rbdc)
 714{
 715        if (rbdc)
 716                kref_put(&rbdc->kref, rbd_client_release);
 717}
 718
 719static bool rbd_image_format_valid(u32 image_format)
 720{
 721        return image_format == 1 || image_format == 2;
 722}
 723
 724static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 725{
 726        size_t size;
 727        u32 snap_count;
 728
 729        /* The header has to start with the magic rbd header text */
 730        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 731                return false;
 732
 733        /* The bio layer requires at least sector-sized I/O */
 734
 735        if (ondisk->options.order < SECTOR_SHIFT)
 736                return false;
 737
 738        /* If we use u64 in a few spots we may be able to loosen this */
 739
 740        if (ondisk->options.order > 8 * sizeof (int) - 1)
 741                return false;
 742
 743        /*
 744         * The size of a snapshot header has to fit in a size_t, and
 745         * that limits the number of snapshots.
 746         */
 747        snap_count = le32_to_cpu(ondisk->snap_count);
 748        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 749        if (snap_count > size / sizeof (__le64))
 750                return false;
 751
 752        /*
 753         * Not only that, but the size of the entire the snapshot
 754         * header must also be representable in a size_t.
 755         */
 756        size -= snap_count * sizeof (__le64);
 757        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 758                return false;
 759
 760        return true;
 761}
 762
 763/*
 764 * Fill an rbd image header with information from the given format 1
 765 * on-disk header.
 766 */
 767static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 768                                 struct rbd_image_header_ondisk *ondisk)
 769{
 770        struct rbd_image_header *header = &rbd_dev->header;
 771        bool first_time = header->object_prefix == NULL;
 772        struct ceph_snap_context *snapc;
 773        char *object_prefix = NULL;
 774        char *snap_names = NULL;
 775        u64 *snap_sizes = NULL;
 776        u32 snap_count;
 777        size_t size;
 778        int ret = -ENOMEM;
 779        u32 i;
 780
 781        /* Allocate this now to avoid having to handle failure below */
 782
 783        if (first_time) {
 784                size_t len;
 785
 786                len = strnlen(ondisk->object_prefix,
 787                                sizeof (ondisk->object_prefix));
 788                object_prefix = kmalloc(len + 1, GFP_KERNEL);
 789                if (!object_prefix)
 790                        return -ENOMEM;
 791                memcpy(object_prefix, ondisk->object_prefix, len);
 792                object_prefix[len] = '\0';
 793        }
 794
 795        /* Allocate the snapshot context and fill it in */
 796
 797        snap_count = le32_to_cpu(ondisk->snap_count);
 798        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
 799        if (!snapc)
 800                goto out_err;
 801        snapc->seq = le64_to_cpu(ondisk->snap_seq);
 802        if (snap_count) {
 803                struct rbd_image_snap_ondisk *snaps;
 804                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 805
 806                /* We'll keep a copy of the snapshot names... */
 807
 808                if (snap_names_len > (u64)SIZE_MAX)
 809                        goto out_2big;
 810                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
 811                if (!snap_names)
 812                        goto out_err;
 813
 814                /* ...as well as the array of their sizes. */
 815
 816                size = snap_count * sizeof (*header->snap_sizes);
 817                snap_sizes = kmalloc(size, GFP_KERNEL);
 818                if (!snap_sizes)
 819                        goto out_err;
 820
 821                /*
 822                 * Copy the names, and fill in each snapshot's id
 823                 * and size.
 824                 *
 825                 * Note that rbd_dev_v1_header_info() guarantees the
 826                 * ondisk buffer we're working with has
 827                 * snap_names_len bytes beyond the end of the
 828                 * snapshot id array, this memcpy() is safe.
 829                 */
 830                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
 831                snaps = ondisk->snaps;
 832                for (i = 0; i < snap_count; i++) {
 833                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
 834                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
 835                }
 836        }
 837
 838        /* We won't fail any more, fill in the header */
 839
 840        if (first_time) {
 841                header->object_prefix = object_prefix;
 842                header->obj_order = ondisk->options.order;
 843                header->crypt_type = ondisk->options.crypt_type;
 844                header->comp_type = ondisk->options.comp_type;
 845                /* The rest aren't used for format 1 images */
 846                header->stripe_unit = 0;
 847                header->stripe_count = 0;
 848                header->features = 0;
 849        } else {
 850                ceph_put_snap_context(header->snapc);
 851                kfree(header->snap_names);
 852                kfree(header->snap_sizes);
 853        }
 854
 855        /* The remaining fields always get updated (when we refresh) */
 856
 857        header->image_size = le64_to_cpu(ondisk->image_size);
 858        header->snapc = snapc;
 859        header->snap_names = snap_names;
 860        header->snap_sizes = snap_sizes;
 861
 862        /* Make sure mapping size is consistent with header info */
 863
 864        if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
 865                if (rbd_dev->mapping.size != header->image_size)
 866                        rbd_dev->mapping.size = header->image_size;
 867
 868        return 0;
 869out_2big:
 870        ret = -EIO;
 871out_err:
 872        kfree(snap_sizes);
 873        kfree(snap_names);
 874        ceph_put_snap_context(snapc);
 875        kfree(object_prefix);
 876
 877        return ret;
 878}
 879
 880static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
 881{
 882        const char *snap_name;
 883
 884        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
 885
 886        /* Skip over names until we find the one we are looking for */
 887
 888        snap_name = rbd_dev->header.snap_names;
 889        while (which--)
 890                snap_name += strlen(snap_name) + 1;
 891
 892        return kstrdup(snap_name, GFP_KERNEL);
 893}
 894
 895/*
 896 * Snapshot id comparison function for use with qsort()/bsearch().
 897 * Note that result is for snapshots in *descending* order.
 898 */
 899static int snapid_compare_reverse(const void *s1, const void *s2)
 900{
 901        u64 snap_id1 = *(u64 *)s1;
 902        u64 snap_id2 = *(u64 *)s2;
 903
 904        if (snap_id1 < snap_id2)
 905                return 1;
 906        return snap_id1 == snap_id2 ? 0 : -1;
 907}
 908
 909/*
 910 * Search a snapshot context to see if the given snapshot id is
 911 * present.
 912 *
 913 * Returns the position of the snapshot id in the array if it's found,
 914 * or BAD_SNAP_INDEX otherwise.
 915 *
 916 * Note: The snapshot array is in kept sorted (by the osd) in
 917 * reverse order, highest snapshot id first.
 918 */
 919static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
 920{
 921        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
 922        u64 *found;
 923
 924        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
 925                                sizeof (snap_id), snapid_compare_reverse);
 926
 927        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
 928}
 929
 930static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
 931                                        u64 snap_id)
 932{
 933        u32 which;
 934        const char *snap_name;
 935
 936        which = rbd_dev_snap_index(rbd_dev, snap_id);
 937        if (which == BAD_SNAP_INDEX)
 938                return ERR_PTR(-ENOENT);
 939
 940        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
 941        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
 942}
 943
 944static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
 945{
 946        if (snap_id == CEPH_NOSNAP)
 947                return RBD_SNAP_HEAD_NAME;
 948
 949        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 950        if (rbd_dev->image_format == 1)
 951                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
 952
 953        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
 954}
 955
 956static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 957                                u64 *snap_size)
 958{
 959        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 960        if (snap_id == CEPH_NOSNAP) {
 961                *snap_size = rbd_dev->header.image_size;
 962        } else if (rbd_dev->image_format == 1) {
 963                u32 which;
 964
 965                which = rbd_dev_snap_index(rbd_dev, snap_id);
 966                if (which == BAD_SNAP_INDEX)
 967                        return -ENOENT;
 968
 969                *snap_size = rbd_dev->header.snap_sizes[which];
 970        } else {
 971                u64 size = 0;
 972                int ret;
 973
 974                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
 975                if (ret)
 976                        return ret;
 977
 978                *snap_size = size;
 979        }
 980        return 0;
 981}
 982
 983static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 984                        u64 *snap_features)
 985{
 986        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 987        if (snap_id == CEPH_NOSNAP) {
 988                *snap_features = rbd_dev->header.features;
 989        } else if (rbd_dev->image_format == 1) {
 990                *snap_features = 0;     /* No features for format 1 */
 991        } else {
 992                u64 features = 0;
 993                int ret;
 994
 995                ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
 996                if (ret)
 997                        return ret;
 998
 999                *snap_features = features;
1000        }
1001        return 0;
1002}
1003
1004static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1005{
1006        u64 snap_id = rbd_dev->spec->snap_id;
1007        u64 size = 0;
1008        u64 features = 0;
1009        int ret;
1010
1011        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1012        if (ret)
1013                return ret;
1014        ret = rbd_snap_features(rbd_dev, snap_id, &features);
1015        if (ret)
1016                return ret;
1017
1018        rbd_dev->mapping.size = size;
1019        rbd_dev->mapping.features = features;
1020
1021        return 0;
1022}
1023
1024static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1025{
1026        rbd_dev->mapping.size = 0;
1027        rbd_dev->mapping.features = 0;
1028}
1029
1030static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1031{
1032        char *name;
1033        u64 segment;
1034        int ret;
1035        char *name_format;
1036
1037        name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1038        if (!name)
1039                return NULL;
1040        segment = offset >> rbd_dev->header.obj_order;
1041        name_format = "%s.%012llx";
1042        if (rbd_dev->image_format == 2)
1043                name_format = "%s.%016llx";
1044        ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
1045                        rbd_dev->header.object_prefix, segment);
1046        if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1047                pr_err("error formatting segment name for #%llu (%d)\n",
1048                        segment, ret);
1049                kfree(name);
1050                name = NULL;
1051        }
1052
1053        return name;
1054}
1055
1056static void rbd_segment_name_free(const char *name)
1057{
1058        /* The explicit cast here is needed to drop the const qualifier */
1059
1060        kmem_cache_free(rbd_segment_name_cache, (void *)name);
1061}
1062
1063static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1064{
1065        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1066
1067        return offset & (segment_size - 1);
1068}
1069
1070static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1071                                u64 offset, u64 length)
1072{
1073        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1074
1075        offset &= segment_size - 1;
1076
1077        rbd_assert(length <= U64_MAX - offset);
1078        if (offset + length > segment_size)
1079                length = segment_size - offset;
1080
1081        return length;
1082}
1083
1084/*
1085 * returns the size of an object in the image
1086 */
1087static u64 rbd_obj_bytes(struct rbd_image_header *header)
1088{
1089        return 1 << header->obj_order;
1090}
1091
1092/*
1093 * bio helpers
1094 */
1095
1096static void bio_chain_put(struct bio *chain)
1097{
1098        struct bio *tmp;
1099
1100        while (chain) {
1101                tmp = chain;
1102                chain = chain->bi_next;
1103                bio_put(tmp);
1104        }
1105}
1106
1107/*
1108 * zeros a bio chain, starting at specific offset
1109 */
1110static void zero_bio_chain(struct bio *chain, int start_ofs)
1111{
1112        struct bio_vec *bv;
1113        unsigned long flags;
1114        void *buf;
1115        int i;
1116        int pos = 0;
1117
1118        while (chain) {
1119                bio_for_each_segment(bv, chain, i) {
1120                        if (pos + bv->bv_len > start_ofs) {
1121                                int remainder = max(start_ofs - pos, 0);
1122                                buf = bvec_kmap_irq(bv, &flags);
1123                                memset(buf + remainder, 0,
1124                                       bv->bv_len - remainder);
1125                                flush_dcache_page(bv->bv_page);
1126                                bvec_kunmap_irq(buf, &flags);
1127                        }
1128                        pos += bv->bv_len;
1129                }
1130
1131                chain = chain->bi_next;
1132        }
1133}
1134
1135/*
1136 * similar to zero_bio_chain(), zeros data defined by a page array,
1137 * starting at the given byte offset from the start of the array and
1138 * continuing up to the given end offset.  The pages array is
1139 * assumed to be big enough to hold all bytes up to the end.
1140 */
1141static void zero_pages(struct page **pages, u64 offset, u64 end)
1142{
1143        struct page **page = &pages[offset >> PAGE_SHIFT];
1144
1145        rbd_assert(end > offset);
1146        rbd_assert(end - offset <= (u64)SIZE_MAX);
1147        while (offset < end) {
1148                size_t page_offset;
1149                size_t length;
1150                unsigned long flags;
1151                void *kaddr;
1152
1153                page_offset = offset & ~PAGE_MASK;
1154                length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1155                local_irq_save(flags);
1156                kaddr = kmap_atomic(*page);
1157                memset(kaddr + page_offset, 0, length);
1158                flush_dcache_page(*page);
1159                kunmap_atomic(kaddr);
1160                local_irq_restore(flags);
1161
1162                offset += length;
1163                page++;
1164        }
1165}
1166
1167/*
1168 * Clone a portion of a bio, starting at the given byte offset
1169 * and continuing for the number of bytes indicated.
1170 */
1171static struct bio *bio_clone_range(struct bio *bio_src,
1172                                        unsigned int offset,
1173                                        unsigned int len,
1174                                        gfp_t gfpmask)
1175{
1176        struct bio_vec *bv;
1177        unsigned int resid;
1178        unsigned short idx;
1179        unsigned int voff;
1180        unsigned short end_idx;
1181        unsigned short vcnt;
1182        struct bio *bio;
1183
1184        /* Handle the easy case for the caller */
1185
1186        if (!offset && len == bio_src->bi_size)
1187                return bio_clone(bio_src, gfpmask);
1188
1189        if (WARN_ON_ONCE(!len))
1190                return NULL;
1191        if (WARN_ON_ONCE(len > bio_src->bi_size))
1192                return NULL;
1193        if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1194                return NULL;
1195
1196        /* Find first affected segment... */
1197
1198        resid = offset;
1199        bio_for_each_segment(bv, bio_src, idx) {
1200                if (resid < bv->bv_len)
1201                        break;
1202                resid -= bv->bv_len;
1203        }
1204        voff = resid;
1205
1206        /* ...and the last affected segment */
1207
1208        resid += len;
1209        __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1210                if (resid <= bv->bv_len)
1211                        break;
1212                resid -= bv->bv_len;
1213        }
1214        vcnt = end_idx - idx + 1;
1215
1216        /* Build the clone */
1217
1218        bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1219        if (!bio)
1220                return NULL;    /* ENOMEM */
1221
1222        bio->bi_bdev = bio_src->bi_bdev;
1223        bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1224        bio->bi_rw = bio_src->bi_rw;
1225        bio->bi_flags |= 1 << BIO_CLONED;
1226
1227        /*
1228         * Copy over our part of the bio_vec, then update the first
1229         * and last (or only) entries.
1230         */
1231        memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1232                        vcnt * sizeof (struct bio_vec));
1233        bio->bi_io_vec[0].bv_offset += voff;
1234        if (vcnt > 1) {
1235                bio->bi_io_vec[0].bv_len -= voff;
1236                bio->bi_io_vec[vcnt - 1].bv_len = resid;
1237        } else {
1238                bio->bi_io_vec[0].bv_len = len;
1239        }
1240
1241        bio->bi_vcnt = vcnt;
1242        bio->bi_size = len;
1243        bio->bi_idx = 0;
1244
1245        return bio;
1246}
1247
1248/*
1249 * Clone a portion of a bio chain, starting at the given byte offset
1250 * into the first bio in the source chain and continuing for the
1251 * number of bytes indicated.  The result is another bio chain of
1252 * exactly the given length, or a null pointer on error.
1253 *
1254 * The bio_src and offset parameters are both in-out.  On entry they
1255 * refer to the first source bio and the offset into that bio where
1256 * the start of data to be cloned is located.
1257 *
1258 * On return, bio_src is updated to refer to the bio in the source
1259 * chain that contains first un-cloned byte, and *offset will
1260 * contain the offset of that byte within that bio.
1261 */
1262static struct bio *bio_chain_clone_range(struct bio **bio_src,
1263                                        unsigned int *offset,
1264                                        unsigned int len,
1265                                        gfp_t gfpmask)
1266{
1267        struct bio *bi = *bio_src;
1268        unsigned int off = *offset;
1269        struct bio *chain = NULL;
1270        struct bio **end;
1271
1272        /* Build up a chain of clone bios up to the limit */
1273
1274        if (!bi || off >= bi->bi_size || !len)
1275                return NULL;            /* Nothing to clone */
1276
1277        end = &chain;
1278        while (len) {
1279                unsigned int bi_size;
1280                struct bio *bio;
1281
1282                if (!bi) {
1283                        rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1284                        goto out_err;   /* EINVAL; ran out of bio's */
1285                }
1286                bi_size = min_t(unsigned int, bi->bi_size - off, len);
1287                bio = bio_clone_range(bi, off, bi_size, gfpmask);
1288                if (!bio)
1289                        goto out_err;   /* ENOMEM */
1290
1291                *end = bio;
1292                end = &bio->bi_next;
1293
1294                off += bi_size;
1295                if (off == bi->bi_size) {
1296                        bi = bi->bi_next;
1297                        off = 0;
1298                }
1299                len -= bi_size;
1300        }
1301        *bio_src = bi;
1302        *offset = off;
1303
1304        return chain;
1305out_err:
1306        bio_chain_put(chain);
1307
1308        return NULL;
1309}
1310
1311/*
1312 * The default/initial value for all object request flags is 0.  For
1313 * each flag, once its value is set to 1 it is never reset to 0
1314 * again.
1315 */
1316static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1317{
1318        if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1319                struct rbd_device *rbd_dev;
1320
1321                rbd_dev = obj_request->img_request->rbd_dev;
1322                rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1323                        obj_request);
1324        }
1325}
1326
1327static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1328{
1329        smp_mb();
1330        return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1331}
1332
1333static void obj_request_done_set(struct rbd_obj_request *obj_request)
1334{
1335        if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1336                struct rbd_device *rbd_dev = NULL;
1337
1338                if (obj_request_img_data_test(obj_request))
1339                        rbd_dev = obj_request->img_request->rbd_dev;
1340                rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1341                        obj_request);
1342        }
1343}
1344
1345static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1346{
1347        smp_mb();
1348        return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1349}
1350
1351/*
1352 * This sets the KNOWN flag after (possibly) setting the EXISTS
1353 * flag.  The latter is set based on the "exists" value provided.
1354 *
1355 * Note that for our purposes once an object exists it never goes
1356 * away again.  It's possible that the response from two existence
1357 * checks are separated by the creation of the target object, and
1358 * the first ("doesn't exist") response arrives *after* the second
1359 * ("does exist").  In that case we ignore the second one.
1360 */
1361static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1362                                bool exists)
1363{
1364        if (exists)
1365                set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1366        set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1367        smp_mb();
1368}
1369
1370static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1371{
1372        smp_mb();
1373        return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1374}
1375
1376static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1377{
1378        smp_mb();
1379        return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1380}
1381
1382static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1383{
1384        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1385                atomic_read(&obj_request->kref.refcount));
1386        kref_get(&obj_request->kref);
1387}
1388
1389static void rbd_obj_request_destroy(struct kref *kref);
1390static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1391{
1392        rbd_assert(obj_request != NULL);
1393        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1394                atomic_read(&obj_request->kref.refcount));
1395        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1396}
1397
1398static bool img_request_child_test(struct rbd_img_request *img_request);
1399static void rbd_parent_request_destroy(struct kref *kref);
1400static void rbd_img_request_destroy(struct kref *kref);
1401static void rbd_img_request_put(struct rbd_img_request *img_request)
1402{
1403        rbd_assert(img_request != NULL);
1404        dout("%s: img %p (was %d)\n", __func__, img_request,
1405                atomic_read(&img_request->kref.refcount));
1406        if (img_request_child_test(img_request))
1407                kref_put(&img_request->kref, rbd_parent_request_destroy);
1408        else
1409                kref_put(&img_request->kref, rbd_img_request_destroy);
1410}
1411
1412static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1413                                        struct rbd_obj_request *obj_request)
1414{
1415        rbd_assert(obj_request->img_request == NULL);
1416
1417        /* Image request now owns object's original reference */
1418        obj_request->img_request = img_request;
1419        obj_request->which = img_request->obj_request_count;
1420        rbd_assert(!obj_request_img_data_test(obj_request));
1421        obj_request_img_data_set(obj_request);
1422        rbd_assert(obj_request->which != BAD_WHICH);
1423        img_request->obj_request_count++;
1424        list_add_tail(&obj_request->links, &img_request->obj_requests);
1425        dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1426                obj_request->which);
1427}
1428
1429static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1430                                        struct rbd_obj_request *obj_request)
1431{
1432        rbd_assert(obj_request->which != BAD_WHICH);
1433
1434        dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1435                obj_request->which);
1436        list_del(&obj_request->links);
1437        rbd_assert(img_request->obj_request_count > 0);
1438        img_request->obj_request_count--;
1439        rbd_assert(obj_request->which == img_request->obj_request_count);
1440        obj_request->which = BAD_WHICH;
1441        rbd_assert(obj_request_img_data_test(obj_request));
1442        rbd_assert(obj_request->img_request == img_request);
1443        obj_request->img_request = NULL;
1444        obj_request->callback = NULL;
1445        rbd_obj_request_put(obj_request);
1446}
1447
1448static bool obj_request_type_valid(enum obj_request_type type)
1449{
1450        switch (type) {
1451        case OBJ_REQUEST_NODATA:
1452        case OBJ_REQUEST_BIO:
1453        case OBJ_REQUEST_PAGES:
1454                return true;
1455        default:
1456                return false;
1457        }
1458}
1459
1460static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1461                                struct rbd_obj_request *obj_request)
1462{
1463        dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1464
1465        return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1466}
1467
1468static void rbd_img_request_complete(struct rbd_img_request *img_request)
1469{
1470
1471        dout("%s: img %p\n", __func__, img_request);
1472
1473        /*
1474         * If no error occurred, compute the aggregate transfer
1475         * count for the image request.  We could instead use
1476         * atomic64_cmpxchg() to update it as each object request
1477         * completes; not clear which way is better off hand.
1478         */
1479        if (!img_request->result) {
1480                struct rbd_obj_request *obj_request;
1481                u64 xferred = 0;
1482
1483                for_each_obj_request(img_request, obj_request)
1484                        xferred += obj_request->xferred;
1485                img_request->xferred = xferred;
1486        }
1487
1488        if (img_request->callback)
1489                img_request->callback(img_request);
1490        else
1491                rbd_img_request_put(img_request);
1492}
1493
1494/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1495
1496static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1497{
1498        dout("%s: obj %p\n", __func__, obj_request);
1499
1500        return wait_for_completion_interruptible(&obj_request->completion);
1501}
1502
1503/*
1504 * The default/initial value for all image request flags is 0.  Each
1505 * is conditionally set to 1 at image request initialization time
1506 * and currently never change thereafter.
1507 */
1508static void img_request_write_set(struct rbd_img_request *img_request)
1509{
1510        set_bit(IMG_REQ_WRITE, &img_request->flags);
1511        smp_mb();
1512}
1513
1514static bool img_request_write_test(struct rbd_img_request *img_request)
1515{
1516        smp_mb();
1517        return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1518}
1519
1520static void img_request_child_set(struct rbd_img_request *img_request)
1521{
1522        set_bit(IMG_REQ_CHILD, &img_request->flags);
1523        smp_mb();
1524}
1525
1526static void img_request_child_clear(struct rbd_img_request *img_request)
1527{
1528        clear_bit(IMG_REQ_CHILD, &img_request->flags);
1529        smp_mb();
1530}
1531
1532static bool img_request_child_test(struct rbd_img_request *img_request)
1533{
1534        smp_mb();
1535        return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1536}
1537
1538static void img_request_layered_set(struct rbd_img_request *img_request)
1539{
1540        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1541        smp_mb();
1542}
1543
1544static void img_request_layered_clear(struct rbd_img_request *img_request)
1545{
1546        clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1547        smp_mb();
1548}
1549
1550static bool img_request_layered_test(struct rbd_img_request *img_request)
1551{
1552        smp_mb();
1553        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1554}
1555
1556static void
1557rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1558{
1559        u64 xferred = obj_request->xferred;
1560        u64 length = obj_request->length;
1561
1562        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1563                obj_request, obj_request->img_request, obj_request->result,
1564                xferred, length);
1565        /*
1566         * ENOENT means a hole in the image.  We zero-fill the entire
1567         * length of the request.  A short read also implies zero-fill
1568         * to the end of the request.  An error requires the whole
1569         * length of the request to be reported finished with an error
1570         * to the block layer.  In each case we update the xferred
1571         * count to indicate the whole request was satisfied.
1572         */
1573        rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1574        if (obj_request->result == -ENOENT) {
1575                if (obj_request->type == OBJ_REQUEST_BIO)
1576                        zero_bio_chain(obj_request->bio_list, 0);
1577                else
1578                        zero_pages(obj_request->pages, 0, length);
1579                obj_request->result = 0;
1580        } else if (xferred < length && !obj_request->result) {
1581                if (obj_request->type == OBJ_REQUEST_BIO)
1582                        zero_bio_chain(obj_request->bio_list, xferred);
1583                else
1584                        zero_pages(obj_request->pages, xferred, length);
1585        }
1586        obj_request->xferred = length;
1587        obj_request_done_set(obj_request);
1588}
1589
1590static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1591{
1592        dout("%s: obj %p cb %p\n", __func__, obj_request,
1593                obj_request->callback);
1594        if (obj_request->callback)
1595                obj_request->callback(obj_request);
1596        else
1597                complete_all(&obj_request->completion);
1598}
1599
1600static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1601{
1602        dout("%s: obj %p\n", __func__, obj_request);
1603        obj_request_done_set(obj_request);
1604}
1605
1606static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1607{
1608        struct rbd_img_request *img_request = NULL;
1609        struct rbd_device *rbd_dev = NULL;
1610        bool layered = false;
1611
1612        if (obj_request_img_data_test(obj_request)) {
1613                img_request = obj_request->img_request;
1614                layered = img_request && img_request_layered_test(img_request);
1615                rbd_dev = img_request->rbd_dev;
1616        }
1617
1618        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1619                obj_request, img_request, obj_request->result,
1620                obj_request->xferred, obj_request->length);
1621        if (layered && obj_request->result == -ENOENT &&
1622                        obj_request->img_offset < rbd_dev->parent_overlap)
1623                rbd_img_parent_read(obj_request);
1624        else if (img_request)
1625                rbd_img_obj_request_read_callback(obj_request);
1626        else
1627                obj_request_done_set(obj_request);
1628}
1629
1630static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1631{
1632        dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1633                obj_request->result, obj_request->length);
1634        /*
1635         * There is no such thing as a successful short write.  Set
1636         * it to our originally-requested length.
1637         */
1638        obj_request->xferred = obj_request->length;
1639        obj_request_done_set(obj_request);
1640}
1641
1642/*
1643 * For a simple stat call there's nothing to do.  We'll do more if
1644 * this is part of a write sequence for a layered image.
1645 */
1646static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1647{
1648        dout("%s: obj %p\n", __func__, obj_request);
1649        obj_request_done_set(obj_request);
1650}
1651
1652static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1653                                struct ceph_msg *msg)
1654{
1655        struct rbd_obj_request *obj_request = osd_req->r_priv;
1656        u16 opcode;
1657
1658        dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1659        rbd_assert(osd_req == obj_request->osd_req);
1660        if (obj_request_img_data_test(obj_request)) {
1661                rbd_assert(obj_request->img_request);
1662                rbd_assert(obj_request->which != BAD_WHICH);
1663        } else {
1664                rbd_assert(obj_request->which == BAD_WHICH);
1665        }
1666
1667        if (osd_req->r_result < 0)
1668                obj_request->result = osd_req->r_result;
1669
1670        BUG_ON(osd_req->r_num_ops > 2);
1671
1672        /*
1673         * We support a 64-bit length, but ultimately it has to be
1674         * passed to blk_end_request(), which takes an unsigned int.
1675         */
1676        obj_request->xferred = osd_req->r_reply_op_len[0];
1677        rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1678        opcode = osd_req->r_ops[0].op;
1679        switch (opcode) {
1680        case CEPH_OSD_OP_READ:
1681                rbd_osd_read_callback(obj_request);
1682                break;
1683        case CEPH_OSD_OP_WRITE:
1684                rbd_osd_write_callback(obj_request);
1685                break;
1686        case CEPH_OSD_OP_STAT:
1687                rbd_osd_stat_callback(obj_request);
1688                break;
1689        case CEPH_OSD_OP_CALL:
1690        case CEPH_OSD_OP_NOTIFY_ACK:
1691        case CEPH_OSD_OP_WATCH:
1692                rbd_osd_trivial_callback(obj_request);
1693                break;
1694        default:
1695                rbd_warn(NULL, "%s: unsupported op %hu\n",
1696                        obj_request->object_name, (unsigned short) opcode);
1697                break;
1698        }
1699
1700        if (obj_request_done_test(obj_request))
1701                rbd_obj_request_complete(obj_request);
1702}
1703
1704static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1705{
1706        struct rbd_img_request *img_request = obj_request->img_request;
1707        struct ceph_osd_request *osd_req = obj_request->osd_req;
1708        u64 snap_id;
1709
1710        rbd_assert(osd_req != NULL);
1711
1712        snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1713        ceph_osdc_build_request(osd_req, obj_request->offset,
1714                        NULL, snap_id, NULL);
1715}
1716
1717static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1718{
1719        struct rbd_img_request *img_request = obj_request->img_request;
1720        struct ceph_osd_request *osd_req = obj_request->osd_req;
1721        struct ceph_snap_context *snapc;
1722        struct timespec mtime = CURRENT_TIME;
1723
1724        rbd_assert(osd_req != NULL);
1725
1726        snapc = img_request ? img_request->snapc : NULL;
1727        ceph_osdc_build_request(osd_req, obj_request->offset,
1728                        snapc, CEPH_NOSNAP, &mtime);
1729}
1730
1731static struct ceph_osd_request *rbd_osd_req_create(
1732                                        struct rbd_device *rbd_dev,
1733                                        bool write_request,
1734                                        struct rbd_obj_request *obj_request)
1735{
1736        struct ceph_snap_context *snapc = NULL;
1737        struct ceph_osd_client *osdc;
1738        struct ceph_osd_request *osd_req;
1739
1740        if (obj_request_img_data_test(obj_request)) {
1741                struct rbd_img_request *img_request = obj_request->img_request;
1742
1743                rbd_assert(write_request ==
1744                                img_request_write_test(img_request));
1745                if (write_request)
1746                        snapc = img_request->snapc;
1747        }
1748
1749        /* Allocate and initialize the request, for the single op */
1750
1751        osdc = &rbd_dev->rbd_client->client->osdc;
1752        osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1753        if (!osd_req)
1754                return NULL;    /* ENOMEM */
1755
1756        if (write_request)
1757                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1758        else
1759                osd_req->r_flags = CEPH_OSD_FLAG_READ;
1760
1761        osd_req->r_callback = rbd_osd_req_callback;
1762        osd_req->r_priv = obj_request;
1763
1764        osd_req->r_oid_len = strlen(obj_request->object_name);
1765        rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1766        memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1767
1768        osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1769
1770        return osd_req;
1771}
1772
1773/*
1774 * Create a copyup osd request based on the information in the
1775 * object request supplied.  A copyup request has two osd ops,
1776 * a copyup method call, and a "normal" write request.
1777 */
1778static struct ceph_osd_request *
1779rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1780{
1781        struct rbd_img_request *img_request;
1782        struct ceph_snap_context *snapc;
1783        struct rbd_device *rbd_dev;
1784        struct ceph_osd_client *osdc;
1785        struct ceph_osd_request *osd_req;
1786
1787        rbd_assert(obj_request_img_data_test(obj_request));
1788        img_request = obj_request->img_request;
1789        rbd_assert(img_request);
1790        rbd_assert(img_request_write_test(img_request));
1791
1792        /* Allocate and initialize the request, for the two ops */
1793
1794        snapc = img_request->snapc;
1795        rbd_dev = img_request->rbd_dev;
1796        osdc = &rbd_dev->rbd_client->client->osdc;
1797        osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1798        if (!osd_req)
1799                return NULL;    /* ENOMEM */
1800
1801        osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1802        osd_req->r_callback = rbd_osd_req_callback;
1803        osd_req->r_priv = obj_request;
1804
1805        osd_req->r_oid_len = strlen(obj_request->object_name);
1806        rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1807        memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1808
1809        osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1810
1811        return osd_req;
1812}
1813
1814
1815static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1816{
1817        ceph_osdc_put_request(osd_req);
1818}
1819
1820/* object_name is assumed to be a non-null pointer and NUL-terminated */
1821
1822static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1823                                                u64 offset, u64 length,
1824                                                enum obj_request_type type)
1825{
1826        struct rbd_obj_request *obj_request;
1827        size_t size;
1828        char *name;
1829
1830        rbd_assert(obj_request_type_valid(type));
1831
1832        size = strlen(object_name) + 1;
1833        name = kmalloc(size, GFP_KERNEL);
1834        if (!name)
1835                return NULL;
1836
1837        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1838        if (!obj_request) {
1839                kfree(name);
1840                return NULL;
1841        }
1842
1843        obj_request->object_name = memcpy(name, object_name, size);
1844        obj_request->offset = offset;
1845        obj_request->length = length;
1846        obj_request->flags = 0;
1847        obj_request->which = BAD_WHICH;
1848        obj_request->type = type;
1849        INIT_LIST_HEAD(&obj_request->links);
1850        init_completion(&obj_request->completion);
1851        kref_init(&obj_request->kref);
1852
1853        dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1854                offset, length, (int)type, obj_request);
1855
1856        return obj_request;
1857}
1858
1859static void rbd_obj_request_destroy(struct kref *kref)
1860{
1861        struct rbd_obj_request *obj_request;
1862
1863        obj_request = container_of(kref, struct rbd_obj_request, kref);
1864
1865        dout("%s: obj %p\n", __func__, obj_request);
1866
1867        rbd_assert(obj_request->img_request == NULL);
1868        rbd_assert(obj_request->which == BAD_WHICH);
1869
1870        if (obj_request->osd_req)
1871                rbd_osd_req_destroy(obj_request->osd_req);
1872
1873        rbd_assert(obj_request_type_valid(obj_request->type));
1874        switch (obj_request->type) {
1875        case OBJ_REQUEST_NODATA:
1876                break;          /* Nothing to do */
1877        case OBJ_REQUEST_BIO:
1878                if (obj_request->bio_list)
1879                        bio_chain_put(obj_request->bio_list);
1880                break;
1881        case OBJ_REQUEST_PAGES:
1882                if (obj_request->pages)
1883                        ceph_release_page_vector(obj_request->pages,
1884                                                obj_request->page_count);
1885                break;
1886        }
1887
1888        kfree(obj_request->object_name);
1889        obj_request->object_name = NULL;
1890        kmem_cache_free(rbd_obj_request_cache, obj_request);
1891}
1892
1893/* It's OK to call this for a device with no parent */
1894
1895static void rbd_spec_put(struct rbd_spec *spec);
1896static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1897{
1898        rbd_dev_remove_parent(rbd_dev);
1899        rbd_spec_put(rbd_dev->parent_spec);
1900        rbd_dev->parent_spec = NULL;
1901        rbd_dev->parent_overlap = 0;
1902}
1903
1904/*
1905 * Parent image reference counting is used to determine when an
1906 * image's parent fields can be safely torn down--after there are no
1907 * more in-flight requests to the parent image.  When the last
1908 * reference is dropped, cleaning them up is safe.
1909 */
1910static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1911{
1912        int counter;
1913
1914        if (!rbd_dev->parent_spec)
1915                return;
1916
1917        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1918        if (counter > 0)
1919                return;
1920
1921        /* Last reference; clean up parent data structures */
1922
1923        if (!counter)
1924                rbd_dev_unparent(rbd_dev);
1925        else
1926                rbd_warn(rbd_dev, "parent reference underflow\n");
1927}
1928
1929/*
1930 * If an image has a non-zero parent overlap, get a reference to its
1931 * parent.
1932 *
1933 * We must get the reference before checking for the overlap to
1934 * coordinate properly with zeroing the parent overlap in
1935 * rbd_dev_v2_parent_info() when an image gets flattened.  We
1936 * drop it again if there is no overlap.
1937 *
1938 * Returns true if the rbd device has a parent with a non-zero
1939 * overlap and a reference for it was successfully taken, or
1940 * false otherwise.
1941 */
1942static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1943{
1944        int counter;
1945
1946        if (!rbd_dev->parent_spec)
1947                return false;
1948
1949        counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1950        if (counter > 0 && rbd_dev->parent_overlap)
1951                return true;
1952
1953        /* Image was flattened, but parent is not yet torn down */
1954
1955        if (counter < 0)
1956                rbd_warn(rbd_dev, "parent reference overflow\n");
1957
1958        return false;
1959}
1960
1961/*
1962 * Caller is responsible for filling in the list of object requests
1963 * that comprises the image request, and the Linux request pointer
1964 * (if there is one).
1965 */
1966static struct rbd_img_request *rbd_img_request_create(
1967                                        struct rbd_device *rbd_dev,
1968                                        u64 offset, u64 length,
1969                                        bool write_request)
1970{
1971        struct rbd_img_request *img_request;
1972
1973        img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1974        if (!img_request)
1975                return NULL;
1976
1977        if (write_request) {
1978                down_read(&rbd_dev->header_rwsem);
1979                ceph_get_snap_context(rbd_dev->header.snapc);
1980                up_read(&rbd_dev->header_rwsem);
1981        }
1982
1983        img_request->rq = NULL;
1984        img_request->rbd_dev = rbd_dev;
1985        img_request->offset = offset;
1986        img_request->length = length;
1987        img_request->flags = 0;
1988        if (write_request) {
1989                img_request_write_set(img_request);
1990                img_request->snapc = rbd_dev->header.snapc;
1991        } else {
1992                img_request->snap_id = rbd_dev->spec->snap_id;
1993        }
1994        if (rbd_dev_parent_get(rbd_dev))
1995                img_request_layered_set(img_request);
1996        spin_lock_init(&img_request->completion_lock);
1997        img_request->next_completion = 0;
1998        img_request->callback = NULL;
1999        img_request->result = 0;
2000        img_request->obj_request_count = 0;
2001        INIT_LIST_HEAD(&img_request->obj_requests);
2002        kref_init(&img_request->kref);
2003
2004        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2005                write_request ? "write" : "read", offset, length,
2006                img_request);
2007
2008        return img_request;
2009}
2010
2011static void rbd_img_request_destroy(struct kref *kref)
2012{
2013        struct rbd_img_request *img_request;
2014        struct rbd_obj_request *obj_request;
2015        struct rbd_obj_request *next_obj_request;
2016
2017        img_request = container_of(kref, struct rbd_img_request, kref);
2018
2019        dout("%s: img %p\n", __func__, img_request);
2020
2021        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2022                rbd_img_obj_request_del(img_request, obj_request);
2023        rbd_assert(img_request->obj_request_count == 0);
2024
2025        if (img_request_layered_test(img_request)) {
2026                img_request_layered_clear(img_request);
2027                rbd_dev_parent_put(img_request->rbd_dev);
2028        }
2029
2030        if (img_request_write_test(img_request))
2031                ceph_put_snap_context(img_request->snapc);
2032
2033        kmem_cache_free(rbd_img_request_cache, img_request);
2034}
2035
2036static struct rbd_img_request *rbd_parent_request_create(
2037                                        struct rbd_obj_request *obj_request,
2038                                        u64 img_offset, u64 length)
2039{
2040        struct rbd_img_request *parent_request;
2041        struct rbd_device *rbd_dev;
2042
2043        rbd_assert(obj_request->img_request);
2044        rbd_dev = obj_request->img_request->rbd_dev;
2045
2046        parent_request = rbd_img_request_create(rbd_dev->parent,
2047                                                img_offset, length, false);
2048        if (!parent_request)
2049                return NULL;
2050
2051        img_request_child_set(parent_request);
2052        rbd_obj_request_get(obj_request);
2053        parent_request->obj_request = obj_request;
2054
2055        return parent_request;
2056}
2057
2058static void rbd_parent_request_destroy(struct kref *kref)
2059{
2060        struct rbd_img_request *parent_request;
2061        struct rbd_obj_request *orig_request;
2062
2063        parent_request = container_of(kref, struct rbd_img_request, kref);
2064        orig_request = parent_request->obj_request;
2065
2066        parent_request->obj_request = NULL;
2067        rbd_obj_request_put(orig_request);
2068        img_request_child_clear(parent_request);
2069
2070        rbd_img_request_destroy(kref);
2071}
2072
2073static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2074{
2075        struct rbd_img_request *img_request;
2076        unsigned int xferred;
2077        int result;
2078        bool more;
2079
2080        rbd_assert(obj_request_img_data_test(obj_request));
2081        img_request = obj_request->img_request;
2082
2083        rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2084        xferred = (unsigned int)obj_request->xferred;
2085        result = obj_request->result;
2086        if (result) {
2087                struct rbd_device *rbd_dev = img_request->rbd_dev;
2088
2089                rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2090                        img_request_write_test(img_request) ? "write" : "read",
2091                        obj_request->length, obj_request->img_offset,
2092                        obj_request->offset);
2093                rbd_warn(rbd_dev, "  result %d xferred %x\n",
2094                        result, xferred);
2095                if (!img_request->result)
2096                        img_request->result = result;
2097        }
2098
2099        /* Image object requests don't own their page array */
2100
2101        if (obj_request->type == OBJ_REQUEST_PAGES) {
2102                obj_request->pages = NULL;
2103                obj_request->page_count = 0;
2104        }
2105
2106        if (img_request_child_test(img_request)) {
2107                rbd_assert(img_request->obj_request != NULL);
2108                more = obj_request->which < img_request->obj_request_count - 1;
2109        } else {
2110                rbd_assert(img_request->rq != NULL);
2111                more = blk_end_request(img_request->rq, result, xferred);
2112        }
2113
2114        return more;
2115}
2116
2117static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2118{
2119        struct rbd_img_request *img_request;
2120        u32 which = obj_request->which;
2121        bool more = true;
2122
2123        rbd_assert(obj_request_img_data_test(obj_request));
2124        img_request = obj_request->img_request;
2125
2126        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2127        rbd_assert(img_request != NULL);
2128        rbd_assert(img_request->obj_request_count > 0);
2129        rbd_assert(which != BAD_WHICH);
2130        rbd_assert(which < img_request->obj_request_count);
2131        rbd_assert(which >= img_request->next_completion);
2132
2133        spin_lock_irq(&img_request->completion_lock);
2134        if (which != img_request->next_completion)
2135                goto out;
2136
2137        for_each_obj_request_from(img_request, obj_request) {
2138                rbd_assert(more);
2139                rbd_assert(which < img_request->obj_request_count);
2140
2141                if (!obj_request_done_test(obj_request))
2142                        break;
2143                more = rbd_img_obj_end_request(obj_request);
2144                which++;
2145        }
2146
2147        rbd_assert(more ^ (which == img_request->obj_request_count));
2148        img_request->next_completion = which;
2149out:
2150        spin_unlock_irq(&img_request->completion_lock);
2151
2152        if (!more)
2153                rbd_img_request_complete(img_request);
2154}
2155
2156/*
2157 * Split up an image request into one or more object requests, each
2158 * to a different object.  The "type" parameter indicates whether
2159 * "data_desc" is the pointer to the head of a list of bio
2160 * structures, or the base of a page array.  In either case this
2161 * function assumes data_desc describes memory sufficient to hold
2162 * all data described by the image request.
2163 */
2164static int rbd_img_request_fill(struct rbd_img_request *img_request,
2165                                        enum obj_request_type type,
2166                                        void *data_desc)
2167{
2168        struct rbd_device *rbd_dev = img_request->rbd_dev;
2169        struct rbd_obj_request *obj_request = NULL;
2170        struct rbd_obj_request *next_obj_request;
2171        bool write_request = img_request_write_test(img_request);
2172        struct bio *bio_list = NULL;
2173        unsigned int bio_offset = 0;
2174        struct page **pages = NULL;
2175        u64 img_offset;
2176        u64 resid;
2177        u16 opcode;
2178
2179        dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2180                (int)type, data_desc);
2181
2182        opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2183        img_offset = img_request->offset;
2184        resid = img_request->length;
2185        rbd_assert(resid > 0);
2186
2187        if (type == OBJ_REQUEST_BIO) {
2188                bio_list = data_desc;
2189                rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2190        } else {
2191                rbd_assert(type == OBJ_REQUEST_PAGES);
2192                pages = data_desc;
2193        }
2194
2195        while (resid) {
2196                struct ceph_osd_request *osd_req;
2197                const char *object_name;
2198                u64 offset;
2199                u64 length;
2200
2201                object_name = rbd_segment_name(rbd_dev, img_offset);
2202                if (!object_name)
2203                        goto out_unwind;
2204                offset = rbd_segment_offset(rbd_dev, img_offset);
2205                length = rbd_segment_length(rbd_dev, img_offset, resid);
2206                obj_request = rbd_obj_request_create(object_name,
2207                                                offset, length, type);
2208                /* object request has its own copy of the object name */
2209                rbd_segment_name_free(object_name);
2210                if (!obj_request)
2211                        goto out_unwind;
2212                /*
2213                 * set obj_request->img_request before creating the
2214                 * osd_request so that it gets the right snapc
2215                 */
2216                rbd_img_obj_request_add(img_request, obj_request);
2217
2218                if (type == OBJ_REQUEST_BIO) {
2219                        unsigned int clone_size;
2220
2221                        rbd_assert(length <= (u64)UINT_MAX);
2222                        clone_size = (unsigned int)length;
2223                        obj_request->bio_list =
2224                                        bio_chain_clone_range(&bio_list,
2225                                                                &bio_offset,
2226                                                                clone_size,
2227                                                                GFP_ATOMIC);
2228                        if (!obj_request->bio_list)
2229                                goto out_partial;
2230                } else {
2231                        unsigned int page_count;
2232
2233                        obj_request->pages = pages;
2234                        page_count = (u32)calc_pages_for(offset, length);
2235                        obj_request->page_count = page_count;
2236                        if ((offset + length) & ~PAGE_MASK)
2237                                page_count--;   /* more on last page */
2238                        pages += page_count;
2239                }
2240
2241                osd_req = rbd_osd_req_create(rbd_dev, write_request,
2242                                                obj_request);
2243                if (!osd_req)
2244                        goto out_partial;
2245                obj_request->osd_req = osd_req;
2246                obj_request->callback = rbd_img_obj_callback;
2247
2248                osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2249                                                0, 0);
2250                if (type == OBJ_REQUEST_BIO)
2251                        osd_req_op_extent_osd_data_bio(osd_req, 0,
2252                                        obj_request->bio_list, length);
2253                else
2254                        osd_req_op_extent_osd_data_pages(osd_req, 0,
2255                                        obj_request->pages, length,
2256                                        offset & ~PAGE_MASK, false, false);
2257
2258                if (write_request)
2259                        rbd_osd_req_format_write(obj_request);
2260                else
2261                        rbd_osd_req_format_read(obj_request);
2262
2263                obj_request->img_offset = img_offset;
2264
2265                img_offset += length;
2266                resid -= length;
2267        }
2268
2269        return 0;
2270
2271out_partial:
2272        rbd_obj_request_put(obj_request);
2273out_unwind:
2274        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2275                rbd_obj_request_put(obj_request);
2276
2277        return -ENOMEM;
2278}
2279
2280static void
2281rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2282{
2283        struct rbd_img_request *img_request;
2284        struct rbd_device *rbd_dev;
2285        struct page **pages;
2286        u32 page_count;
2287
2288        rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2289        rbd_assert(obj_request_img_data_test(obj_request));
2290        img_request = obj_request->img_request;
2291        rbd_assert(img_request);
2292
2293        rbd_dev = img_request->rbd_dev;
2294        rbd_assert(rbd_dev);
2295
2296        pages = obj_request->copyup_pages;
2297        rbd_assert(pages != NULL);
2298        obj_request->copyup_pages = NULL;
2299        page_count = obj_request->copyup_page_count;
2300        rbd_assert(page_count);
2301        obj_request->copyup_page_count = 0;
2302        ceph_release_page_vector(pages, page_count);
2303
2304        /*
2305         * We want the transfer count to reflect the size of the
2306         * original write request.  There is no such thing as a
2307         * successful short write, so if the request was successful
2308         * we can just set it to the originally-requested length.
2309         */
2310        if (!obj_request->result)
2311                obj_request->xferred = obj_request->length;
2312
2313        /* Finish up with the normal image object callback */
2314
2315        rbd_img_obj_callback(obj_request);
2316}
2317
2318static void
2319rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2320{
2321        struct rbd_obj_request *orig_request;
2322        struct ceph_osd_request *osd_req;
2323        struct ceph_osd_client *osdc;
2324        struct rbd_device *rbd_dev;
2325        struct page **pages;
2326        u32 page_count;
2327        int img_result;
2328        u64 parent_length;
2329        u64 offset;
2330        u64 length;
2331
2332        rbd_assert(img_request_child_test(img_request));
2333
2334        /* First get what we need from the image request */
2335
2336        pages = img_request->copyup_pages;
2337        rbd_assert(pages != NULL);
2338        img_request->copyup_pages = NULL;
2339        page_count = img_request->copyup_page_count;
2340        rbd_assert(page_count);
2341        img_request->copyup_page_count = 0;
2342
2343        orig_request = img_request->obj_request;
2344        rbd_assert(orig_request != NULL);
2345        rbd_assert(obj_request_type_valid(orig_request->type));
2346        img_result = img_request->result;
2347        parent_length = img_request->length;
2348        rbd_assert(parent_length == img_request->xferred);
2349        rbd_img_request_put(img_request);
2350
2351        rbd_assert(orig_request->img_request);
2352        rbd_dev = orig_request->img_request->rbd_dev;
2353        rbd_assert(rbd_dev);
2354
2355        /*
2356         * If the overlap has become 0 (most likely because the
2357         * image has been flattened) we need to free the pages
2358         * and re-submit the original write request.
2359         */
2360        if (!rbd_dev->parent_overlap) {
2361                struct ceph_osd_client *osdc;
2362
2363                ceph_release_page_vector(pages, page_count);
2364                osdc = &rbd_dev->rbd_client->client->osdc;
2365                img_result = rbd_obj_request_submit(osdc, orig_request);
2366                if (!img_result)
2367                        return;
2368        }
2369
2370        if (img_result)
2371                goto out_err;
2372
2373        /*
2374         * The original osd request is of no use to use any more.
2375         * We need a new one that can hold the two ops in a copyup
2376         * request.  Allocate the new copyup osd request for the
2377         * original request, and release the old one.
2378         */
2379        img_result = -ENOMEM;
2380        osd_req = rbd_osd_req_create_copyup(orig_request);
2381        if (!osd_req)
2382                goto out_err;
2383        rbd_osd_req_destroy(orig_request->osd_req);
2384        orig_request->osd_req = osd_req;
2385        orig_request->copyup_pages = pages;
2386        orig_request->copyup_page_count = page_count;
2387
2388        /* Initialize the copyup op */
2389
2390        osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2391        osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2392                                                false, false);
2393
2394        /* Then the original write request op */
2395
2396        offset = orig_request->offset;
2397        length = orig_request->length;
2398        osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2399                                        offset, length, 0, 0);
2400        if (orig_request->type == OBJ_REQUEST_BIO)
2401                osd_req_op_extent_osd_data_bio(osd_req, 1,
2402                                        orig_request->bio_list, length);
2403        else
2404                osd_req_op_extent_osd_data_pages(osd_req, 1,
2405                                        orig_request->pages, length,
2406                                        offset & ~PAGE_MASK, false, false);
2407
2408        rbd_osd_req_format_write(orig_request);
2409
2410        /* All set, send it off. */
2411
2412        orig_request->callback = rbd_img_obj_copyup_callback;
2413        osdc = &rbd_dev->rbd_client->client->osdc;
2414        img_result = rbd_obj_request_submit(osdc, orig_request);
2415        if (!img_result)
2416                return;
2417out_err:
2418        /* Record the error code and complete the request */
2419
2420        orig_request->result = img_result;
2421        orig_request->xferred = 0;
2422        obj_request_done_set(orig_request);
2423        rbd_obj_request_complete(orig_request);
2424}
2425
2426/*
2427 * Read from the parent image the range of data that covers the
2428 * entire target of the given object request.  This is used for
2429 * satisfying a layered image write request when the target of an
2430 * object request from the image request does not exist.
2431 *
2432 * A page array big enough to hold the returned data is allocated
2433 * and supplied to rbd_img_request_fill() as the "data descriptor."
2434 * When the read completes, this page array will be transferred to
2435 * the original object request for the copyup operation.
2436 *
2437 * If an error occurs, record it as the result of the original
2438 * object request and mark it done so it gets completed.
2439 */
2440static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2441{
2442        struct rbd_img_request *img_request = NULL;
2443        struct rbd_img_request *parent_request = NULL;
2444        struct rbd_device *rbd_dev;
2445        u64 img_offset;
2446        u64 length;
2447        struct page **pages = NULL;
2448        u32 page_count;
2449        int result;
2450
2451        rbd_assert(obj_request_img_data_test(obj_request));
2452        rbd_assert(obj_request_type_valid(obj_request->type));
2453
2454        img_request = obj_request->img_request;
2455        rbd_assert(img_request != NULL);
2456        rbd_dev = img_request->rbd_dev;
2457        rbd_assert(rbd_dev->parent != NULL);
2458
2459        /*
2460         * Determine the byte range covered by the object in the
2461         * child image to which the original request was to be sent.
2462         */
2463        img_offset = obj_request->img_offset - obj_request->offset;
2464        length = (u64)1 << rbd_dev->header.obj_order;
2465
2466        /*
2467         * There is no defined parent data beyond the parent
2468         * overlap, so limit what we read at that boundary if
2469         * necessary.
2470         */
2471        if (img_offset + length > rbd_dev->parent_overlap) {
2472                rbd_assert(img_offset < rbd_dev->parent_overlap);
2473                length = rbd_dev->parent_overlap - img_offset;
2474        }
2475
2476        /*
2477         * Allocate a page array big enough to receive the data read
2478         * from the parent.
2479         */
2480        page_count = (u32)calc_pages_for(0, length);
2481        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2482        if (IS_ERR(pages)) {
2483                result = PTR_ERR(pages);
2484                pages = NULL;
2485                goto out_err;
2486        }
2487
2488        result = -ENOMEM;
2489        parent_request = rbd_parent_request_create(obj_request,
2490                                                img_offset, length);
2491        if (!parent_request)
2492                goto out_err;
2493
2494        result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2495        if (result)
2496                goto out_err;
2497        parent_request->copyup_pages = pages;
2498        parent_request->copyup_page_count = page_count;
2499
2500        parent_request->callback = rbd_img_obj_parent_read_full_callback;
2501        result = rbd_img_request_submit(parent_request);
2502        if (!result)
2503                return 0;
2504
2505        parent_request->copyup_pages = NULL;
2506        parent_request->copyup_page_count = 0;
2507        parent_request->obj_request = NULL;
2508        rbd_obj_request_put(obj_request);
2509out_err:
2510        if (pages)
2511                ceph_release_page_vector(pages, page_count);
2512        if (parent_request)
2513                rbd_img_request_put(parent_request);
2514        obj_request->result = result;
2515        obj_request->xferred = 0;
2516        obj_request_done_set(obj_request);
2517
2518        return result;
2519}
2520
2521static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2522{
2523        struct rbd_obj_request *orig_request;
2524        struct rbd_device *rbd_dev;
2525        int result;
2526
2527        rbd_assert(!obj_request_img_data_test(obj_request));
2528
2529        /*
2530         * All we need from the object request is the original
2531         * request and the result of the STAT op.  Grab those, then
2532         * we're done with the request.
2533         */
2534        orig_request = obj_request->obj_request;
2535        obj_request->obj_request = NULL;
2536        rbd_obj_request_put(orig_request);
2537        rbd_assert(orig_request);
2538        rbd_assert(orig_request->img_request);
2539
2540        result = obj_request->result;
2541        obj_request->result = 0;
2542
2543        dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2544                obj_request, orig_request, result,
2545                obj_request->xferred, obj_request->length);
2546        rbd_obj_request_put(obj_request);
2547
2548        /*
2549         * If the overlap has become 0 (most likely because the
2550         * image has been flattened) we need to free the pages
2551         * and re-submit the original write request.
2552         */
2553        rbd_dev = orig_request->img_request->rbd_dev;
2554        if (!rbd_dev->parent_overlap) {
2555                struct ceph_osd_client *osdc;
2556
2557                osdc = &rbd_dev->rbd_client->client->osdc;
2558                result = rbd_obj_request_submit(osdc, orig_request);
2559                if (!result)
2560                        return;
2561        }
2562
2563        /*
2564         * Our only purpose here is to determine whether the object
2565         * exists, and we don't want to treat the non-existence as
2566         * an error.  If something else comes back, transfer the
2567         * error to the original request and complete it now.
2568         */
2569        if (!result) {
2570                obj_request_existence_set(orig_request, true);
2571        } else if (result == -ENOENT) {
2572                obj_request_existence_set(orig_request, false);
2573        } else if (result) {
2574                orig_request->result = result;
2575                goto out;
2576        }
2577
2578        /*
2579         * Resubmit the original request now that we have recorded
2580         * whether the target object exists.
2581         */
2582        orig_request->result = rbd_img_obj_request_submit(orig_request);
2583out:
2584        if (orig_request->result)
2585                rbd_obj_request_complete(orig_request);
2586}
2587
2588static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2589{
2590        struct rbd_obj_request *stat_request;
2591        struct rbd_device *rbd_dev;
2592        struct ceph_osd_client *osdc;
2593        struct page **pages = NULL;
2594        u32 page_count;
2595        size_t size;
2596        int ret;
2597
2598        /*
2599         * The response data for a STAT call consists of:
2600         *     le64 length;
2601         *     struct {
2602         *         le32 tv_sec;
2603         *         le32 tv_nsec;
2604         *     } mtime;
2605         */
2606        size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2607        page_count = (u32)calc_pages_for(0, size);
2608        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2609        if (IS_ERR(pages))
2610                return PTR_ERR(pages);
2611
2612        ret = -ENOMEM;
2613        stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2614                                                        OBJ_REQUEST_PAGES);
2615        if (!stat_request)
2616                goto out;
2617
2618        rbd_obj_request_get(obj_request);
2619        stat_request->obj_request = obj_request;
2620        stat_request->pages = pages;
2621        stat_request->page_count = page_count;
2622
2623        rbd_assert(obj_request->img_request);
2624        rbd_dev = obj_request->img_request->rbd_dev;
2625        stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2626                                                stat_request);
2627        if (!stat_request->osd_req)
2628                goto out;
2629        stat_request->callback = rbd_img_obj_exists_callback;
2630
2631        osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2632        osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2633                                        false, false);
2634        rbd_osd_req_format_read(stat_request);
2635
2636        osdc = &rbd_dev->rbd_client->client->osdc;
2637        ret = rbd_obj_request_submit(osdc, stat_request);
2638out:
2639        if (ret)
2640                rbd_obj_request_put(obj_request);
2641
2642        return ret;
2643}
2644
2645static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2646{
2647        struct rbd_img_request *img_request;
2648        struct rbd_device *rbd_dev;
2649        bool known;
2650
2651        rbd_assert(obj_request_img_data_test(obj_request));
2652
2653        img_request = obj_request->img_request;
2654        rbd_assert(img_request);
2655        rbd_dev = img_request->rbd_dev;
2656
2657        /*
2658         * Only writes to layered images need special handling.
2659         * Reads and non-layered writes are simple object requests.
2660         * Layered writes that start beyond the end of the overlap
2661         * with the parent have no parent data, so they too are
2662         * simple object requests.  Finally, if the target object is
2663         * known to already exist, its parent data has already been
2664         * copied, so a write to the object can also be handled as a
2665         * simple object request.
2666         */
2667        if (!img_request_write_test(img_request) ||
2668                !img_request_layered_test(img_request) ||
2669                rbd_dev->parent_overlap <= obj_request->img_offset ||
2670                ((known = obj_request_known_test(obj_request)) &&
2671                        obj_request_exists_test(obj_request))) {
2672
2673                struct rbd_device *rbd_dev;
2674                struct ceph_osd_client *osdc;
2675
2676                rbd_dev = obj_request->img_request->rbd_dev;
2677                osdc = &rbd_dev->rbd_client->client->osdc;
2678
2679                return rbd_obj_request_submit(osdc, obj_request);
2680        }
2681
2682        /*
2683         * It's a layered write.  The target object might exist but
2684         * we may not know that yet.  If we know it doesn't exist,
2685         * start by reading the data for the full target object from
2686         * the parent so we can use it for a copyup to the target.
2687         */
2688        if (known)
2689                return rbd_img_obj_parent_read_full(obj_request);
2690
2691        /* We don't know whether the target exists.  Go find out. */
2692
2693        return rbd_img_obj_exists_submit(obj_request);
2694}
2695
2696static int rbd_img_request_submit(struct rbd_img_request *img_request)
2697{
2698        struct rbd_obj_request *obj_request;
2699        struct rbd_obj_request *next_obj_request;
2700
2701        dout("%s: img %p\n", __func__, img_request);
2702        for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2703                int ret;
2704
2705                ret = rbd_img_obj_request_submit(obj_request);
2706                if (ret)
2707                        return ret;
2708        }
2709
2710        return 0;
2711}
2712
2713static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2714{
2715        struct rbd_obj_request *obj_request;
2716        struct rbd_device *rbd_dev;
2717        u64 obj_end;
2718        u64 img_xferred;
2719        int img_result;
2720
2721        rbd_assert(img_request_child_test(img_request));
2722
2723        /* First get what we need from the image request and release it */
2724
2725        obj_request = img_request->obj_request;
2726        img_xferred = img_request->xferred;
2727        img_result = img_request->result;
2728        rbd_img_request_put(img_request);
2729
2730        /*
2731         * If the overlap has become 0 (most likely because the
2732         * image has been flattened) we need to re-submit the
2733         * original request.
2734         */
2735        rbd_assert(obj_request);
2736        rbd_assert(obj_request->img_request);
2737        rbd_dev = obj_request->img_request->rbd_dev;
2738        if (!rbd_dev->parent_overlap) {
2739                struct ceph_osd_client *osdc;
2740
2741                osdc = &rbd_dev->rbd_client->client->osdc;
2742                img_result = rbd_obj_request_submit(osdc, obj_request);
2743                if (!img_result)
2744                        return;
2745        }
2746
2747        obj_request->result = img_result;
2748        if (obj_request->result)
2749                goto out;
2750
2751        /*
2752         * We need to zero anything beyond the parent overlap
2753         * boundary.  Since rbd_img_obj_request_read_callback()
2754         * will zero anything beyond the end of a short read, an
2755         * easy way to do this is to pretend the data from the
2756         * parent came up short--ending at the overlap boundary.
2757         */
2758        rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2759        obj_end = obj_request->img_offset + obj_request->length;
2760        if (obj_end > rbd_dev->parent_overlap) {
2761                u64 xferred = 0;
2762
2763                if (obj_request->img_offset < rbd_dev->parent_overlap)
2764                        xferred = rbd_dev->parent_overlap -
2765                                        obj_request->img_offset;
2766
2767                obj_request->xferred = min(img_xferred, xferred);
2768        } else {
2769                obj_request->xferred = img_xferred;
2770        }
2771out:
2772        rbd_img_obj_request_read_callback(obj_request);
2773        rbd_obj_request_complete(obj_request);
2774}
2775
2776static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2777{
2778        struct rbd_img_request *img_request;
2779        int result;
2780
2781        rbd_assert(obj_request_img_data_test(obj_request));
2782        rbd_assert(obj_request->img_request != NULL);
2783        rbd_assert(obj_request->result == (s32) -ENOENT);
2784        rbd_assert(obj_request_type_valid(obj_request->type));
2785
2786        /* rbd_read_finish(obj_request, obj_request->length); */
2787        img_request = rbd_parent_request_create(obj_request,
2788                                                obj_request->img_offset,
2789                                                obj_request->length);
2790        result = -ENOMEM;
2791        if (!img_request)
2792                goto out_err;
2793
2794        if (obj_request->type == OBJ_REQUEST_BIO)
2795                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2796                                                obj_request->bio_list);
2797        else
2798                result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2799                                                obj_request->pages);
2800        if (result)
2801                goto out_err;
2802
2803        img_request->callback = rbd_img_parent_read_callback;
2804        result = rbd_img_request_submit(img_request);
2805        if (result)
2806                goto out_err;
2807
2808        return;
2809out_err:
2810        if (img_request)
2811                rbd_img_request_put(img_request);
2812        obj_request->result = result;
2813        obj_request->xferred = 0;
2814        obj_request_done_set(obj_request);
2815}
2816
2817static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2818{
2819        struct rbd_obj_request *obj_request;
2820        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2821        int ret;
2822
2823        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2824                                                        OBJ_REQUEST_NODATA);
2825        if (!obj_request)
2826                return -ENOMEM;
2827
2828        ret = -ENOMEM;
2829        obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2830        if (!obj_request->osd_req)
2831                goto out;
2832
2833        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2834                                        notify_id, 0, 0);
2835        rbd_osd_req_format_read(obj_request);
2836
2837        ret = rbd_obj_request_submit(osdc, obj_request);
2838        if (ret)
2839                goto out;
2840        ret = rbd_obj_request_wait(obj_request);
2841out:
2842        rbd_obj_request_put(obj_request);
2843
2844        return ret;
2845}
2846
2847static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2848{
2849        struct rbd_device *rbd_dev = (struct rbd_device *)data;
2850        int ret;
2851
2852        if (!rbd_dev)
2853                return;
2854
2855        dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2856                rbd_dev->header_name, (unsigned long long)notify_id,
2857                (unsigned int)opcode);
2858        ret = rbd_dev_refresh(rbd_dev);
2859        if (ret)
2860                rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2861
2862        rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2863}
2864
2865/*
2866 * Request sync osd watch/unwatch.  The value of "start" determines
2867 * whether a watch request is being initiated or torn down.
2868 */
2869static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870{
2871        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2872        struct rbd_obj_request *obj_request;
2873        int ret;
2874
2875        rbd_assert(start ^ !!rbd_dev->watch_event);
2876        rbd_assert(start ^ !!rbd_dev->watch_request);
2877
2878        if (start) {
2879                ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2880                                                &rbd_dev->watch_event);
2881                if (ret < 0)
2882                        return ret;
2883                rbd_assert(rbd_dev->watch_event != NULL);
2884        }
2885
2886        ret = -ENOMEM;
2887        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2888                                                        OBJ_REQUEST_NODATA);
2889        if (!obj_request)
2890                goto out_cancel;
2891
2892        obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2893        if (!obj_request->osd_req)
2894                goto out_cancel;
2895
2896        if (start)
2897                ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2898        else
2899                ceph_osdc_unregister_linger_request(osdc,
2900                                        rbd_dev->watch_request->osd_req);
2901
2902        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2903                                rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2904        rbd_osd_req_format_write(obj_request);
2905
2906        ret = rbd_obj_request_submit(osdc, obj_request);
2907        if (ret)
2908                goto out_cancel;
2909        ret = rbd_obj_request_wait(obj_request);
2910        if (ret)
2911                goto out_cancel;
2912        ret = obj_request->result;
2913        if (ret)
2914                goto out_cancel;
2915
2916        /*
2917         * A watch request is set to linger, so the underlying osd
2918         * request won't go away until we unregister it.  We retain
2919         * a pointer to the object request during that time (in
2920         * rbd_dev->watch_request), so we'll keep a reference to
2921         * it.  We'll drop that reference (below) after we've
2922         * unregistered it.
2923         */
2924        if (start) {
2925                rbd_dev->watch_request = obj_request;
2926
2927                return 0;
2928        }
2929
2930        /* We have successfully torn down the watch request */
2931
2932        rbd_obj_request_put(rbd_dev->watch_request);
2933        rbd_dev->watch_request = NULL;
2934out_cancel:
2935        /* Cancel the event if we're tearing down, or on error */
2936        ceph_osdc_cancel_event(rbd_dev->watch_event);
2937        rbd_dev->watch_event = NULL;
2938        if (obj_request)
2939                rbd_obj_request_put(obj_request);
2940
2941        return ret;
2942}
2943
2944/*
2945 * Synchronous osd object method call.  Returns the number of bytes
2946 * returned in the outbound buffer, or a negative error code.
2947 */
2948static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2949                             const char *object_name,
2950                             const char *class_name,
2951                             const char *method_name,
2952                             const void *outbound,
2953                             size_t outbound_size,
2954                             void *inbound,
2955                             size_t inbound_size)
2956{
2957        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2958        struct rbd_obj_request *obj_request;
2959        struct page **pages;
2960        u32 page_count;
2961        int ret;
2962
2963        /*
2964         * Method calls are ultimately read operations.  The result
2965         * should placed into the inbound buffer provided.  They
2966         * also supply outbound data--parameters for the object
2967         * method.  Currently if this is present it will be a
2968         * snapshot id.
2969         */
2970        page_count = (u32)calc_pages_for(0, inbound_size);
2971        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2972        if (IS_ERR(pages))
2973                return PTR_ERR(pages);
2974
2975        ret = -ENOMEM;
2976        obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2977                                                        OBJ_REQUEST_PAGES);
2978        if (!obj_request)
2979                goto out;
2980
2981        obj_request->pages = pages;
2982        obj_request->page_count = page_count;
2983
2984        obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2985        if (!obj_request->osd_req)
2986                goto out;
2987
2988        osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2989                                        class_name, method_name);
2990        if (outbound_size) {
2991                struct ceph_pagelist *pagelist;
2992
2993                pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2994                if (!pagelist)
2995                        goto out;
2996
2997                ceph_pagelist_init(pagelist);
2998                ceph_pagelist_append(pagelist, outbound, outbound_size);
2999                osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3000                                                pagelist);
3001        }
3002        osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3003                                        obj_request->pages, inbound_size,
3004                                        0, false, false);
3005        rbd_osd_req_format_read(obj_request);
3006
3007        ret = rbd_obj_request_submit(osdc, obj_request);
3008        if (ret)
3009                goto out;
3010        ret = rbd_obj_request_wait(obj_request);
3011        if (ret)
3012                goto out;
3013
3014        ret = obj_request->result;
3015        if (ret < 0)
3016                goto out;
3017
3018        rbd_assert(obj_request->xferred < (u64)INT_MAX);
3019        ret = (int)obj_request->xferred;
3020        ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3021out:
3022        if (obj_request)
3023                rbd_obj_request_put(obj_request);
3024        else
3025                ceph_release_page_vector(pages, page_count);
3026
3027        return ret;
3028}
3029
3030static void rbd_request_fn(struct request_queue *q)
3031                __releases(q->queue_lock) __acquires(q->queue_lock)
3032{
3033        struct rbd_device *rbd_dev = q->queuedata;
3034        bool read_only = rbd_dev->mapping.read_only;
3035        struct request *rq;
3036        int result;
3037
3038        while ((rq = blk_fetch_request(q))) {
3039                bool write_request = rq_data_dir(rq) == WRITE;
3040                struct rbd_img_request *img_request;
3041                u64 offset;
3042                u64 length;
3043
3044                /* Ignore any non-FS requests that filter through. */
3045
3046                if (rq->cmd_type != REQ_TYPE_FS) {
3047                        dout("%s: non-fs request type %d\n", __func__,
3048                                (int) rq->cmd_type);
3049                        __blk_end_request_all(rq, 0);
3050                        continue;
3051                }
3052
3053                /* Ignore/skip any zero-length requests */
3054
3055                offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3056                length = (u64) blk_rq_bytes(rq);
3057
3058                if (!length) {
3059                        dout("%s: zero-length request\n", __func__);
3060                        __blk_end_request_all(rq, 0);
3061                        continue;
3062                }
3063
3064                spin_unlock_irq(q->queue_lock);
3065
3066                /* Disallow writes to a read-only device */
3067
3068                if (write_request) {
3069                        result = -EROFS;
3070                        if (read_only)
3071                                goto end_request;
3072                        rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3073                }
3074
3075                /*
3076                 * Quit early if the mapped snapshot no longer
3077                 * exists.  It's still possible the snapshot will
3078                 * have disappeared by the time our request arrives
3079                 * at the osd, but there's no sense in sending it if
3080                 * we already know.
3081                 */
3082                if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3083                        dout("request for non-existent snapshot");
3084                        rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3085                        result = -ENXIO;
3086                        goto end_request;
3087                }
3088
3089                result = -EINVAL;
3090                if (offset && length > U64_MAX - offset + 1) {
3091                        rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3092                                offset, length);
3093                        goto end_request;       /* Shouldn't happen */
3094                }
3095
3096                result = -EIO;
3097                if (offset + length > rbd_dev->mapping.size) {
3098                        rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3099                                offset, length, rbd_dev->mapping.size);
3100                        goto end_request;
3101                }
3102
3103                result = -ENOMEM;
3104                img_request = rbd_img_request_create(rbd_dev, offset, length,
3105                                                        write_request);
3106                if (!img_request)
3107                        goto end_request;
3108
3109                img_request->rq = rq;
3110
3111                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3112                                                rq->bio);
3113                if (!result)
3114                        result = rbd_img_request_submit(img_request);
3115                if (result)
3116                        rbd_img_request_put(img_request);
3117end_request:
3118                spin_lock_irq(q->queue_lock);
3119                if (result < 0) {
3120                        rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3121                                write_request ? "write" : "read",
3122                                length, offset, result);
3123
3124                        __blk_end_request_all(rq, result);
3125                }
3126        }
3127}
3128
3129/*
3130 * a queue callback. Makes sure that we don't create a bio that spans across
3131 * multiple osd objects. One exception would be with a single page bios,
3132 * which we handle later at bio_chain_clone_range()
3133 */
3134static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3135                          struct bio_vec *bvec)
3136{
3137        struct rbd_device *rbd_dev = q->queuedata;
3138        sector_t sector_offset;
3139        sector_t sectors_per_obj;
3140        sector_t obj_sector_offset;
3141        int ret;
3142
3143        /*
3144         * Find how far into its rbd object the partition-relative
3145         * bio start sector is to offset relative to the enclosing
3146         * device.
3147         */
3148        sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3149        sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3150        obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3151
3152        /*
3153         * Compute the number of bytes from that offset to the end
3154         * of the object.  Account for what's already used by the bio.
3155         */
3156        ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3157        if (ret > bmd->bi_size)
3158                ret -= bmd->bi_size;
3159        else
3160                ret = 0;
3161
3162        /*
3163         * Don't send back more than was asked for.  And if the bio
3164         * was empty, let the whole thing through because:  "Note
3165         * that a block device *must* allow a single page to be
3166         * added to an empty bio."
3167         */
3168        rbd_assert(bvec->bv_len <= PAGE_SIZE);
3169        if (ret > (int) bvec->bv_len || !bmd->bi_size)
3170                ret = (int) bvec->bv_len;
3171
3172        return ret;
3173}
3174
3175static void rbd_free_disk(struct rbd_device *rbd_dev)
3176{
3177        struct gendisk *disk = rbd_dev->disk;
3178
3179        if (!disk)
3180                return;
3181
3182        rbd_dev->disk = NULL;
3183        if (disk->flags & GENHD_FL_UP) {
3184                del_gendisk(disk);
3185                if (disk->queue)
3186                        blk_cleanup_queue(disk->queue);
3187        }
3188        put_disk(disk);
3189}
3190
3191static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3192                                const char *object_name,
3193                                u64 offset, u64 length, void *buf)
3194
3195{
3196        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3197        struct rbd_obj_request *obj_request;
3198        struct page **pages = NULL;
3199        u32 page_count;
3200        size_t size;
3201        int ret;
3202
3203        page_count = (u32) calc_pages_for(offset, length);
3204        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3205        if (IS_ERR(pages))
3206                ret = PTR_ERR(pages);
3207
3208        ret = -ENOMEM;
3209        obj_request = rbd_obj_request_create(object_name, offset, length,
3210                                                        OBJ_REQUEST_PAGES);
3211        if (!obj_request)
3212                goto out;
3213
3214        obj_request->pages = pages;
3215        obj_request->page_count = page_count;
3216
3217        obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3218        if (!obj_request->osd_req)
3219                goto out;
3220
3221        osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3222                                        offset, length, 0, 0);
3223        osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3224                                        obj_request->pages,
3225                                        obj_request->length,
3226                                        obj_request->offset & ~PAGE_MASK,
3227                                        false, false);
3228        rbd_osd_req_format_read(obj_request);
3229
3230        ret = rbd_obj_request_submit(osdc, obj_request);
3231        if (ret)
3232                goto out;
3233        ret = rbd_obj_request_wait(obj_request);
3234        if (ret)
3235                goto out;
3236
3237        ret = obj_request->result;
3238        if (ret < 0)
3239                goto out;
3240
3241        rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3242        size = (size_t) obj_request->xferred;
3243        ceph_copy_from_page_vector(pages, buf, 0, size);
3244        rbd_assert(size <= (size_t)INT_MAX);
3245        ret = (int)size;
3246out:
3247        if (obj_request)
3248                rbd_obj_request_put(obj_request);
3249        else
3250                ceph_release_page_vector(pages, page_count);
3251
3252        return ret;
3253}
3254
3255/*
3256 * Read the complete header for the given rbd device.  On successful
3257 * return, the rbd_dev->header field will contain up-to-date
3258 * information about the image.
3259 */
3260static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3261{
3262        struct rbd_image_header_ondisk *ondisk = NULL;
3263        u32 snap_count = 0;
3264        u64 names_size = 0;
3265        u32 want_count;
3266        int ret;
3267
3268        /*
3269         * The complete header will include an array of its 64-bit
3270         * snapshot ids, followed by the names of those snapshots as
3271         * a contiguous block of NUL-terminated strings.  Note that
3272         * the number of snapshots could change by the time we read
3273         * it in, in which case we re-read it.
3274         */
3275        do {
3276                size_t size;
3277
3278                kfree(ondisk);
3279
3280                size = sizeof (*ondisk);
3281                size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3282                size += names_size;
3283                ondisk = kmalloc(size, GFP_KERNEL);
3284                if (!ondisk)
3285                        return -ENOMEM;
3286
3287                ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3288                                       0, size, ondisk);
3289                if (ret < 0)
3290                        goto out;
3291                if ((size_t)ret < size) {
3292                        ret = -ENXIO;
3293                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3294                                size, ret);
3295                        goto out;
3296                }
3297                if (!rbd_dev_ondisk_valid(ondisk)) {
3298                        ret = -ENXIO;
3299                        rbd_warn(rbd_dev, "invalid header");
3300                        goto out;
3301                }
3302
3303                names_size = le64_to_cpu(ondisk->snap_names_len);
3304                want_count = snap_count;
3305                snap_count = le32_to_cpu(ondisk->snap_count);
3306        } while (snap_count != want_count);
3307
3308        ret = rbd_header_from_disk(rbd_dev, ondisk);
3309out:
3310        kfree(ondisk);
3311
3312        return ret;
3313}
3314
3315/*
3316 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3317 * has disappeared from the (just updated) snapshot context.
3318 */
3319static void rbd_exists_validate(struct rbd_device *rbd_dev)
3320{
3321        u64 snap_id;
3322
3323        if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3324                return;
3325
3326        snap_id = rbd_dev->spec->snap_id;
3327        if (snap_id == CEPH_NOSNAP)
3328                return;
3329
3330        if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3331                clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3332}
3333
3334static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3335{
3336        sector_t size;
3337        bool removing;
3338
3339        /*
3340         * Don't hold the lock while doing disk operations,
3341         * or lock ordering will conflict with the bdev mutex via:
3342         * rbd_add() -> blkdev_get() -> rbd_open()
3343         */
3344        spin_lock_irq(&rbd_dev->lock);
3345        removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3346        spin_unlock_irq(&rbd_dev->lock);
3347        /*
3348         * If the device is being removed, rbd_dev->disk has
3349         * been destroyed, so don't try to update its size
3350         */
3351        if (!removing) {
3352                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3353                dout("setting size to %llu sectors", (unsigned long long)size);
3354                set_capacity(rbd_dev->disk, size);
3355                revalidate_disk(rbd_dev->disk);
3356        }
3357}
3358
3359static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3360{
3361        u64 mapping_size;
3362        int ret;
3363
3364        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3365        down_write(&rbd_dev->header_rwsem);
3366        mapping_size = rbd_dev->mapping.size;
3367        if (rbd_dev->image_format == 1)
3368                ret = rbd_dev_v1_header_info(rbd_dev);
3369        else
3370                ret = rbd_dev_v2_header_info(rbd_dev);
3371
3372        /* If it's a mapped snapshot, validate its EXISTS flag */
3373
3374        rbd_exists_validate(rbd_dev);
3375        up_write(&rbd_dev->header_rwsem);
3376
3377        if (mapping_size != rbd_dev->mapping.size) {
3378                rbd_dev_update_size(rbd_dev);
3379        }
3380
3381        return ret;
3382}
3383
3384static int rbd_init_disk(struct rbd_device *rbd_dev)
3385{
3386        struct gendisk *disk;
3387        struct request_queue *q;
3388        u64 segment_size;
3389
3390        /* create gendisk info */
3391        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3392        if (!disk)
3393                return -ENOMEM;
3394
3395        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3396                 rbd_dev->dev_id);
3397        disk->major = rbd_dev->major;
3398        disk->first_minor = 0;
3399        disk->fops = &rbd_bd_ops;
3400        disk->private_data = rbd_dev;
3401
3402        q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3403        if (!q)
3404                goto out_disk;
3405
3406        /* We use the default size, but let's be explicit about it. */
3407        blk_queue_physical_block_size(q, SECTOR_SIZE);
3408
3409        /* set io sizes to object size */
3410        segment_size = rbd_obj_bytes(&rbd_dev->header);
3411        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3412        blk_queue_max_segment_size(q, segment_size);
3413        blk_queue_io_min(q, segment_size);
3414        blk_queue_io_opt(q, segment_size);
3415
3416        blk_queue_merge_bvec(q, rbd_merge_bvec);
3417        disk->queue = q;
3418
3419        q->queuedata = rbd_dev;
3420
3421        rbd_dev->disk = disk;
3422
3423        return 0;
3424out_disk:
3425        put_disk(disk);
3426
3427        return -ENOMEM;
3428}
3429
3430/*
3431  sysfs
3432*/
3433
3434static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3435{
3436        return container_of(dev, struct rbd_device, dev);
3437}
3438
3439static ssize_t rbd_size_show(struct device *dev,
3440                             struct device_attribute *attr, char *buf)
3441{
3442        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3443
3444        return sprintf(buf, "%llu\n",
3445                (unsigned long long)rbd_dev->mapping.size);
3446}
3447
3448/*
3449 * Note this shows the features for whatever's mapped, which is not
3450 * necessarily the base image.
3451 */
3452static ssize_t rbd_features_show(struct device *dev,
3453                             struct device_attribute *attr, char *buf)
3454{
3455        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3456
3457        return sprintf(buf, "0x%016llx\n",
3458                        (unsigned long long)rbd_dev->mapping.features);
3459}
3460
3461static ssize_t rbd_major_show(struct device *dev,
3462                              struct device_attribute *attr, char *buf)
3463{
3464        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3465
3466        if (rbd_dev->major)
3467                return sprintf(buf, "%d\n", rbd_dev->major);
3468
3469        return sprintf(buf, "(none)\n");
3470
3471}
3472
3473static ssize_t rbd_client_id_show(struct device *dev,
3474                                  struct device_attribute *attr, char *buf)
3475{
3476        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3477
3478        return sprintf(buf, "client%lld\n",
3479                        ceph_client_id(rbd_dev->rbd_client->client));
3480}
3481
3482static ssize_t rbd_pool_show(struct device *dev,
3483                             struct device_attribute *attr, char *buf)
3484{
3485        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3486
3487        return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3488}
3489
3490static ssize_t rbd_pool_id_show(struct device *dev,
3491                             struct device_attribute *attr, char *buf)
3492{
3493        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3494
3495        return sprintf(buf, "%llu\n",
3496                        (unsigned long long) rbd_dev->spec->pool_id);
3497}
3498
3499static ssize_t rbd_name_show(struct device *dev,
3500                             struct device_attribute *attr, char *buf)
3501{
3502        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3503
3504        if (rbd_dev->spec->image_name)
3505                return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3506
3507        return sprintf(buf, "(unknown)\n");
3508}
3509
3510static ssize_t rbd_image_id_show(struct device *dev,
3511                             struct device_attribute *attr, char *buf)
3512{
3513        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3514
3515        return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3516}
3517
3518/*
3519 * Shows the name of the currently-mapped snapshot (or
3520 * RBD_SNAP_HEAD_NAME for the base image).
3521 */
3522static ssize_t rbd_snap_show(struct device *dev,
3523                             struct device_attribute *attr,
3524                             char *buf)
3525{
3526        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3527
3528        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3529}
3530
3531/*
3532 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3533 * for the parent image.  If there is no parent, simply shows
3534 * "(no parent image)".
3535 */
3536static ssize_t rbd_parent_show(struct device *dev,
3537                             struct device_attribute *attr,
3538                             char *buf)
3539{
3540        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3541        struct rbd_spec *spec = rbd_dev->parent_spec;
3542        int count;
3543        char *bufp = buf;
3544
3545        if (!spec)
3546                return sprintf(buf, "(no parent image)\n");
3547
3548        count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3549                        (unsigned long long) spec->pool_id, spec->pool_name);
3550        if (count < 0)
3551                return count;
3552        bufp += count;
3553
3554        count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3555                        spec->image_name ? spec->image_name : "(unknown)");
3556        if (count < 0)
3557                return count;
3558        bufp += count;
3559
3560        count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3561                        (unsigned long long) spec->snap_id, spec->snap_name);
3562        if (count < 0)
3563                return count;
3564        bufp += count;
3565
3566        count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3567        if (count < 0)
3568                return count;
3569        bufp += count;
3570
3571        return (ssize_t) (bufp - buf);
3572}
3573
3574static ssize_t rbd_image_refresh(struct device *dev,
3575                                 struct device_attribute *attr,
3576                                 const char *buf,
3577                                 size_t size)
3578{
3579        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3580        int ret;
3581
3582        ret = rbd_dev_refresh(rbd_dev);
3583        if (ret)
3584                rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3585
3586        return ret < 0 ? ret : size;
3587}
3588
3589static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3590static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3591static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3592static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3593static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3594static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3595static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3596static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3597static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3598static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3599static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3600
3601static struct attribute *rbd_attrs[] = {
3602        &dev_attr_size.attr,
3603        &dev_attr_features.attr,
3604        &dev_attr_major.attr,
3605        &dev_attr_client_id.attr,
3606        &dev_attr_pool.attr,
3607        &dev_attr_pool_id.attr,
3608        &dev_attr_name.attr,
3609        &dev_attr_image_id.attr,
3610        &dev_attr_current_snap.attr,
3611        &dev_attr_parent.attr,
3612        &dev_attr_refresh.attr,
3613        NULL
3614};
3615
3616static struct attribute_group rbd_attr_group = {
3617        .attrs = rbd_attrs,
3618};
3619
3620static const struct attribute_group *rbd_attr_groups[] = {
3621        &rbd_attr_group,
3622        NULL
3623};
3624
3625static void rbd_sysfs_dev_release(struct device *dev)
3626{
3627}
3628
3629static struct device_type rbd_device_type = {
3630        .name           = "rbd",
3631        .groups         = rbd_attr_groups,
3632        .release        = rbd_sysfs_dev_release,
3633};
3634
3635static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3636{
3637        kref_get(&spec->kref);
3638
3639        return spec;
3640}
3641
3642static void rbd_spec_free(struct kref *kref);
3643static void rbd_spec_put(struct rbd_spec *spec)
3644{
3645        if (spec)
3646                kref_put(&spec->kref, rbd_spec_free);
3647}
3648
3649static struct rbd_spec *rbd_spec_alloc(void)
3650{
3651        struct rbd_spec *spec;
3652
3653        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3654        if (!spec)
3655                return NULL;
3656        kref_init(&spec->kref);
3657
3658        return spec;
3659}
3660
3661static void rbd_spec_free(struct kref *kref)
3662{
3663        struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3664
3665        kfree(spec->pool_name);
3666        kfree(spec->image_id);
3667        kfree(spec->image_name);
3668        kfree(spec->snap_name);
3669        kfree(spec);
3670}
3671
3672static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3673                                struct rbd_spec *spec)
3674{
3675        struct rbd_device *rbd_dev;
3676
3677        rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3678        if (!rbd_dev)
3679                return NULL;
3680
3681        spin_lock_init(&rbd_dev->lock);
3682        rbd_dev->flags = 0;
3683        atomic_set(&rbd_dev->parent_ref, 0);
3684        INIT_LIST_HEAD(&rbd_dev->node);
3685        init_rwsem(&rbd_dev->header_rwsem);
3686
3687        rbd_dev->spec = spec;
3688        rbd_dev->rbd_client = rbdc;
3689
3690        /* Initialize the layout used for all rbd requests */
3691
3692        rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3693        rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3694        rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3695        rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3696
3697        return rbd_dev;
3698}
3699
3700static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3701{
3702        rbd_put_client(rbd_dev->rbd_client);
3703        rbd_spec_put(rbd_dev->spec);
3704        kfree(rbd_dev);
3705}
3706
3707/*
3708 * Get the size and object order for an image snapshot, or if
3709 * snap_id is CEPH_NOSNAP, gets this information for the base
3710 * image.
3711 */
3712static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3713                                u8 *order, u64 *snap_size)
3714{
3715        __le64 snapid = cpu_to_le64(snap_id);
3716        int ret;
3717        struct {
3718                u8 order;
3719                __le64 size;
3720        } __attribute__ ((packed)) size_buf = { 0 };
3721
3722        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3723                                "rbd", "get_size",
3724                                &snapid, sizeof (snapid),
3725                                &size_buf, sizeof (size_buf));
3726        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3727        if (ret < 0)
3728                return ret;
3729        if (ret < sizeof (size_buf))
3730                return -ERANGE;
3731
3732        if (order) {
3733                *order = size_buf.order;
3734                dout("  order %u", (unsigned int)*order);
3735        }
3736        *snap_size = le64_to_cpu(size_buf.size);
3737
3738        dout("  snap_id 0x%016llx snap_size = %llu\n",
3739                (unsigned long long)snap_id,
3740                (unsigned long long)*snap_size);
3741
3742        return 0;
3743}
3744
3745static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3746{
3747        return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3748                                        &rbd_dev->header.obj_order,
3749                                        &rbd_dev->header.image_size);
3750}
3751
3752static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3753{
3754        void *reply_buf;
3755        int ret;
3756        void *p;
3757
3758        reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3759        if (!reply_buf)
3760                return -ENOMEM;
3761
3762        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3763                                "rbd", "get_object_prefix", NULL, 0,
3764                                reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3765        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3766        if (ret < 0)
3767                goto out;
3768
3769        p = reply_buf;
3770        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3771                                                p + ret, NULL, GFP_NOIO);
3772        ret = 0;
3773
3774        if (IS_ERR(rbd_dev->header.object_prefix)) {
3775                ret = PTR_ERR(rbd_dev->header.object_prefix);
3776                rbd_dev->header.object_prefix = NULL;
3777        } else {
3778                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3779        }
3780out:
3781        kfree(reply_buf);
3782
3783        return ret;
3784}
3785
3786static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3787                u64 *snap_features)
3788{
3789        __le64 snapid = cpu_to_le64(snap_id);
3790        struct {
3791                __le64 features;
3792                __le64 incompat;
3793        } __attribute__ ((packed)) features_buf = { 0 };
3794        u64 incompat;
3795        int ret;
3796
3797        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3798                                "rbd", "get_features",
3799                                &snapid, sizeof (snapid),
3800                                &features_buf, sizeof (features_buf));
3801        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3802        if (ret < 0)
3803                return ret;
3804        if (ret < sizeof (features_buf))
3805                return -ERANGE;
3806
3807        incompat = le64_to_cpu(features_buf.incompat);
3808        if (incompat & ~RBD_FEATURES_SUPPORTED)
3809                return -ENXIO;
3810
3811        *snap_features = le64_to_cpu(features_buf.features);
3812
3813        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3814                (unsigned long long)snap_id,
3815                (unsigned long long)*snap_features,
3816                (unsigned long long)le64_to_cpu(features_buf.incompat));
3817
3818        return 0;
3819}
3820
3821static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3822{
3823        return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3824                                                &rbd_dev->header.features);
3825}
3826
3827static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3828{
3829        struct rbd_spec *parent_spec;
3830        size_t size;
3831        void *reply_buf = NULL;
3832        __le64 snapid;
3833        void *p;
3834        void *end;
3835        u64 pool_id;
3836        char *image_id;
3837        u64 snap_id;
3838        u64 overlap;
3839        int ret;
3840
3841        parent_spec = rbd_spec_alloc();
3842        if (!parent_spec)
3843                return -ENOMEM;
3844
3845        size = sizeof (__le64) +                                /* pool_id */
3846                sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3847                sizeof (__le64) +                               /* snap_id */
3848                sizeof (__le64);                                /* overlap */
3849        reply_buf = kmalloc(size, GFP_KERNEL);
3850        if (!reply_buf) {
3851                ret = -ENOMEM;
3852                goto out_err;
3853        }
3854
3855        snapid = cpu_to_le64(CEPH_NOSNAP);
3856        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3857                                "rbd", "get_parent",
3858                                &snapid, sizeof (snapid),
3859                                reply_buf, size);
3860        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3861        if (ret < 0)
3862                goto out_err;
3863
3864        p = reply_buf;
3865        end = reply_buf + ret;
3866        ret = -ERANGE;
3867        ceph_decode_64_safe(&p, end, pool_id, out_err);
3868        if (pool_id == CEPH_NOPOOL) {
3869                /*
3870                 * Either the parent never existed, or we have
3871                 * record of it but the image got flattened so it no
3872                 * longer has a parent.  When the parent of a
3873                 * layered image disappears we immediately set the
3874                 * overlap to 0.  The effect of this is that all new
3875                 * requests will be treated as if the image had no
3876                 * parent.
3877                 */
3878                if (rbd_dev->parent_overlap) {
3879                        rbd_dev->parent_overlap = 0;
3880                        smp_mb();
3881                        rbd_dev_parent_put(rbd_dev);
3882                        pr_info("%s: clone image has been flattened\n",
3883                                rbd_dev->disk->disk_name);
3884                }
3885
3886                goto out;       /* No parent?  No problem. */
3887        }
3888
3889        /* The ceph file layout needs to fit pool id in 32 bits */
3890
3891        ret = -EIO;
3892        if (pool_id > (u64)U32_MAX) {
3893                rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3894                        (unsigned long long)pool_id, U32_MAX);
3895                goto out_err;
3896        }
3897
3898        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3899        if (IS_ERR(image_id)) {
3900                ret = PTR_ERR(image_id);
3901                goto out_err;
3902        }
3903        ceph_decode_64_safe(&p, end, snap_id, out_err);
3904        ceph_decode_64_safe(&p, end, overlap, out_err);
3905
3906        /*
3907         * The parent won't change (except when the clone is
3908         * flattened, already handled that).  So we only need to
3909         * record the parent spec we have not already done so.
3910         */
3911        if (!rbd_dev->parent_spec) {
3912                parent_spec->pool_id = pool_id;
3913                parent_spec->image_id = image_id;
3914                parent_spec->snap_id = snap_id;
3915                rbd_dev->parent_spec = parent_spec;
3916                parent_spec = NULL;     /* rbd_dev now owns this */
3917        }
3918
3919        /*
3920         * We always update the parent overlap.  If it's zero we
3921         * treat it specially.
3922         */
3923        rbd_dev->parent_overlap = overlap;
3924        smp_mb();
3925        if (!overlap) {
3926
3927                /* A null parent_spec indicates it's the initial probe */
3928
3929                if (parent_spec) {
3930                        /*
3931                         * The overlap has become zero, so the clone
3932                         * must have been resized down to 0 at some
3933                         * point.  Treat this the same as a flatten.
3934                         */
3935                        rbd_dev_parent_put(rbd_dev);
3936                        pr_info("%s: clone image now standalone\n",
3937                                rbd_dev->disk->disk_name);
3938                } else {
3939                        /*
3940                         * For the initial probe, if we find the
3941                         * overlap is zero we just pretend there was
3942                         * no parent image.
3943                         */
3944                        rbd_warn(rbd_dev, "ignoring parent of "
3945                                                "clone with overlap 0\n");
3946                }
3947        }
3948out:
3949        ret = 0;
3950out_err:
3951        kfree(reply_buf);
3952        rbd_spec_put(parent_spec);
3953
3954        return ret;
3955}
3956
3957static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3958{
3959        struct {
3960                __le64 stripe_unit;
3961                __le64 stripe_count;
3962        } __attribute__ ((packed)) striping_info_buf = { 0 };
3963        size_t size = sizeof (striping_info_buf);
3964        void *p;
3965        u64 obj_size;
3966        u64 stripe_unit;
3967        u64 stripe_count;
3968        int ret;
3969
3970        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3971                                "rbd", "get_stripe_unit_count", NULL, 0,
3972                                (char *)&striping_info_buf, size);
3973        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3974        if (ret < 0)
3975                return ret;
3976        if (ret < size)
3977                return -ERANGE;
3978
3979        /*
3980         * We don't actually support the "fancy striping" feature
3981         * (STRIPINGV2) yet, but if the striping sizes are the
3982         * defaults the behavior is the same as before.  So find
3983         * out, and only fail if the image has non-default values.
3984         */
3985        ret = -EINVAL;
3986        obj_size = (u64)1 << rbd_dev->header.obj_order;
3987        p = &striping_info_buf;
3988        stripe_unit = ceph_decode_64(&p);
3989        if (stripe_unit != obj_size) {
3990                rbd_warn(rbd_dev, "unsupported stripe unit "
3991                                "(got %llu want %llu)",
3992                                stripe_unit, obj_size);
3993                return -EINVAL;
3994        }
3995        stripe_count = ceph_decode_64(&p);
3996        if (stripe_count != 1) {
3997                rbd_warn(rbd_dev, "unsupported stripe count "
3998                                "(got %llu want 1)", stripe_count);
3999                return -EINVAL;
4000        }
4001        rbd_dev->header.stripe_unit = stripe_unit;
4002        rbd_dev->header.stripe_count = stripe_count;
4003
4004        return 0;
4005}
4006
4007static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4008{
4009        size_t image_id_size;
4010        char *image_id;
4011        void *p;
4012        void *end;
4013        size_t size;
4014        void *reply_buf = NULL;
4015        size_t len = 0;
4016        char *image_name = NULL;
4017        int ret;
4018
4019        rbd_assert(!rbd_dev->spec->image_name);
4020
4021        len = strlen(rbd_dev->spec->image_id);
4022        image_id_size = sizeof (__le32) + len;
4023        image_id = kmalloc(image_id_size, GFP_KERNEL);
4024        if (!image_id)
4025                return NULL;
4026
4027        p = image_id;
4028        end = image_id + image_id_size;
4029        ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4030
4031        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4032        reply_buf = kmalloc(size, GFP_KERNEL);
4033        if (!reply_buf)
4034                goto out;
4035
4036        ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4037                                "rbd", "dir_get_name",
4038                                image_id, image_id_size,
4039                                reply_buf, size);
4040        if (ret < 0)
4041                goto out;
4042        p = reply_buf;
4043        end = reply_buf + ret;
4044
4045        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4046        if (IS_ERR(image_name))
4047                image_name = NULL;
4048        else
4049                dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4050out:
4051        kfree(reply_buf);
4052        kfree(image_id);
4053
4054        return image_name;
4055}
4056
4057static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4058{
4059        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4060        const char *snap_name;
4061        u32 which = 0;
4062
4063        /* Skip over names until we find the one we are looking for */
4064
4065        snap_name = rbd_dev->header.snap_names;
4066        while (which < snapc->num_snaps) {
4067                if (!strcmp(name, snap_name))
4068                        return snapc->snaps[which];
4069                snap_name += strlen(snap_name) + 1;
4070                which++;
4071        }
4072        return CEPH_NOSNAP;
4073}
4074
4075static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4076{
4077        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4078        u32 which;
4079        bool found = false;
4080        u64 snap_id;
4081
4082        for (which = 0; !found && which < snapc->num_snaps; which++) {
4083                const char *snap_name;
4084
4085                snap_id = snapc->snaps[which];
4086                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4087                if (IS_ERR(snap_name)) {
4088                        /* ignore no-longer existing snapshots */
4089                        if (PTR_ERR(snap_name) == -ENOENT)
4090                                continue;
4091                        else
4092                                break;
4093                }
4094                found = !strcmp(name, snap_name);
4095                kfree(snap_name);
4096        }
4097        return found ? snap_id : CEPH_NOSNAP;
4098}
4099
4100/*
4101 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4102 * no snapshot by that name is found, or if an error occurs.
4103 */
4104static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4105{
4106        if (rbd_dev->image_format == 1)
4107                return rbd_v1_snap_id_by_name(rbd_dev, name);
4108
4109        return rbd_v2_snap_id_by_name(rbd_dev, name);
4110}
4111
4112/*
4113 * When an rbd image has a parent image, it is identified by the
4114 * pool, image, and snapshot ids (not names).  This function fills
4115 * in the names for those ids.  (It's OK if we can't figure out the
4116 * name for an image id, but the pool and snapshot ids should always
4117 * exist and have names.)  All names in an rbd spec are dynamically
4118 * allocated.
4119 *
4120 * When an image being mapped (not a parent) is probed, we have the
4121 * pool name and pool id, image name and image id, and the snapshot
4122 * name.  The only thing we're missing is the snapshot id.
4123 */
4124static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4125{
4126        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4127        struct rbd_spec *spec = rbd_dev->spec;
4128        const char *pool_name;
4129        const char *image_name;
4130        const char *snap_name;
4131        int ret;
4132
4133        /*
4134         * An image being mapped will have the pool name (etc.), but
4135         * we need to look up the snapshot id.
4136         */
4137        if (spec->pool_name) {
4138                if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4139                        u64 snap_id;
4140
4141                        snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4142                        if (snap_id == CEPH_NOSNAP)
4143                                return -ENOENT;
4144                        spec->snap_id = snap_id;
4145                } else {
4146                        spec->snap_id = CEPH_NOSNAP;
4147                }
4148
4149                return 0;
4150        }
4151
4152        /* Get the pool name; we have to make our own copy of this */
4153
4154        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4155        if (!pool_name) {
4156                rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4157                return -EIO;
4158        }
4159        pool_name = kstrdup(pool_name, GFP_KERNEL);
4160        if (!pool_name)
4161                return -ENOMEM;
4162
4163        /* Fetch the image name; tolerate failure here */
4164
4165        image_name = rbd_dev_image_name(rbd_dev);
4166        if (!image_name)
4167                rbd_warn(rbd_dev, "unable to get image name");
4168
4169        /* Look up the snapshot name, and make a copy */
4170
4171        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4172        if (IS_ERR(snap_name)) {
4173                ret = PTR_ERR(snap_name);
4174                goto out_err;
4175        }
4176
4177        spec->pool_name = pool_name;
4178        spec->image_name = image_name;
4179        spec->snap_name = snap_name;
4180
4181        return 0;
4182out_err:
4183        kfree(image_name);
4184        kfree(pool_name);
4185
4186        return ret;
4187}
4188
4189static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4190{
4191        size_t size;
4192        int ret;
4193        void *reply_buf;
4194        void *p;
4195        void *end;
4196        u64 seq;
4197        u32 snap_count;
4198        struct ceph_snap_context *snapc;
4199        u32 i;
4200
4201        /*
4202         * We'll need room for the seq value (maximum snapshot id),
4203         * snapshot count, and array of that many snapshot ids.
4204         * For now we have a fixed upper limit on the number we're
4205         * prepared to receive.
4206         */
4207        size = sizeof (__le64) + sizeof (__le32) +
4208                        RBD_MAX_SNAP_COUNT * sizeof (__le64);
4209        reply_buf = kzalloc(size, GFP_KERNEL);
4210        if (!reply_buf)
4211                return -ENOMEM;
4212
4213        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4214                                "rbd", "get_snapcontext", NULL, 0,
4215                                reply_buf, size);
4216        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4217        if (ret < 0)
4218                goto out;
4219
4220        p = reply_buf;
4221        end = reply_buf + ret;
4222        ret = -ERANGE;
4223        ceph_decode_64_safe(&p, end, seq, out);
4224        ceph_decode_32_safe(&p, end, snap_count, out);
4225
4226        /*
4227         * Make sure the reported number of snapshot ids wouldn't go
4228         * beyond the end of our buffer.  But before checking that,
4229         * make sure the computed size of the snapshot context we
4230         * allocate is representable in a size_t.
4231         */
4232        if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4233                                 / sizeof (u64)) {
4234                ret = -EINVAL;
4235                goto out;
4236        }
4237        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4238                goto out;
4239        ret = 0;
4240
4241        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4242        if (!snapc) {
4243                ret = -ENOMEM;
4244                goto out;
4245        }
4246        snapc->seq = seq;
4247        for (i = 0; i < snap_count; i++)
4248                snapc->snaps[i] = ceph_decode_64(&p);
4249
4250        ceph_put_snap_context(rbd_dev->header.snapc);
4251        rbd_dev->header.snapc = snapc;
4252
4253        dout("  snap context seq = %llu, snap_count = %u\n",
4254                (unsigned long long)seq, (unsigned int)snap_count);
4255out:
4256        kfree(reply_buf);
4257
4258        return ret;
4259}
4260
4261static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4262                                        u64 snap_id)
4263{
4264        size_t size;
4265        void *reply_buf;
4266        __le64 snapid;
4267        int ret;
4268        void *p;
4269        void *end;
4270        char *snap_name;
4271
4272        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4273        reply_buf = kmalloc(size, GFP_KERNEL);
4274        if (!reply_buf)
4275                return ERR_PTR(-ENOMEM);
4276
4277        snapid = cpu_to_le64(snap_id);
4278        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4279                                "rbd", "get_snapshot_name",
4280                                &snapid, sizeof (snapid),
4281                                reply_buf, size);
4282        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4283        if (ret < 0) {
4284                snap_name = ERR_PTR(ret);
4285                goto out;
4286        }
4287
4288        p = reply_buf;
4289        end = reply_buf + ret;
4290        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4291        if (IS_ERR(snap_name))
4292                goto out;
4293
4294        dout("  snap_id 0x%016llx snap_name = %s\n",
4295                (unsigned long long)snap_id, snap_name);
4296out:
4297        kfree(reply_buf);
4298
4299        return snap_name;
4300}
4301
4302static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4303{
4304        bool first_time = rbd_dev->header.object_prefix == NULL;
4305        int ret;
4306
4307        ret = rbd_dev_v2_image_size(rbd_dev);
4308        if (ret)
4309                return ret;
4310
4311        if (first_time) {
4312                ret = rbd_dev_v2_header_onetime(rbd_dev);
4313                if (ret)
4314                        return ret;
4315        }
4316
4317        /*
4318         * If the image supports layering, get the parent info.  We
4319         * need to probe the first time regardless.  Thereafter we
4320         * only need to if there's a parent, to see if it has
4321         * disappeared due to the mapped image getting flattened.
4322         */
4323        if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4324                        (first_time || rbd_dev->parent_spec)) {
4325                bool warn;
4326
4327                ret = rbd_dev_v2_parent_info(rbd_dev);
4328                if (ret)
4329                        return ret;
4330
4331                /*
4332                 * Print a warning if this is the initial probe and
4333                 * the image has a parent.  Don't print it if the
4334                 * image now being probed is itself a parent.  We
4335                 * can tell at this point because we won't know its
4336                 * pool name yet (just its pool id).
4337                 */
4338                warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4339                if (first_time && warn)
4340                        rbd_warn(rbd_dev, "WARNING: kernel layering "
4341                                        "is EXPERIMENTAL!");
4342        }
4343
4344        if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4345                if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4346                        rbd_dev->mapping.size = rbd_dev->header.image_size;
4347
4348        ret = rbd_dev_v2_snap_context(rbd_dev);
4349        dout("rbd_dev_v2_snap_context returned %d\n", ret);
4350
4351        return ret;
4352}
4353
4354static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4355{
4356        struct device *dev;
4357        int ret;
4358
4359        dev = &rbd_dev->dev;
4360        dev->bus = &rbd_bus_type;
4361        dev->type = &rbd_device_type;
4362        dev->parent = &rbd_root_dev;
4363        dev->release = rbd_dev_device_release;
4364        dev_set_name(dev, "%d", rbd_dev->dev_id);
4365        ret = device_register(dev);
4366
4367        return ret;
4368}
4369
4370static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4371{
4372        device_unregister(&rbd_dev->dev);
4373}
4374
4375static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4376
4377/*
4378 * Get a unique rbd identifier for the given new rbd_dev, and add
4379 * the rbd_dev to the global list.  The minimum rbd id is 1.
4380 */
4381static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4382{
4383        rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4384
4385        spin_lock(&rbd_dev_list_lock);
4386        list_add_tail(&rbd_dev->node, &rbd_dev_list);
4387        spin_unlock(&rbd_dev_list_lock);
4388        dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4389                (unsigned long long) rbd_dev->dev_id);
4390}
4391
4392/*
4393 * Remove an rbd_dev from the global list, and record that its
4394 * identifier is no longer in use.
4395 */
4396static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4397{
4398        struct list_head *tmp;
4399        int rbd_id = rbd_dev->dev_id;
4400        int max_id;
4401
4402        rbd_assert(rbd_id > 0);
4403
4404        dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4405                (unsigned long long) rbd_dev->dev_id);
4406        spin_lock(&rbd_dev_list_lock);
4407        list_del_init(&rbd_dev->node);
4408
4409        /*
4410         * If the id being "put" is not the current maximum, there
4411         * is nothing special we need to do.
4412         */
4413        if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4414                spin_unlock(&rbd_dev_list_lock);
4415                return;
4416        }
4417
4418        /*
4419         * We need to update the current maximum id.  Search the
4420         * list to find out what it is.  We're more likely to find
4421         * the maximum at the end, so search the list backward.
4422         */
4423        max_id = 0;
4424        list_for_each_prev(tmp, &rbd_dev_list) {
4425                struct rbd_device *rbd_dev;
4426
4427                rbd_dev = list_entry(tmp, struct rbd_device, node);
4428                if (rbd_dev->dev_id > max_id)
4429                        max_id = rbd_dev->dev_id;
4430        }
4431        spin_unlock(&rbd_dev_list_lock);
4432
4433        /*
4434         * The max id could have been updated by rbd_dev_id_get(), in
4435         * which case it now accurately reflects the new maximum.
4436         * Be careful not to overwrite the maximum value in that
4437         * case.
4438         */
4439        atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4440        dout("  max dev id has been reset\n");
4441}
4442
4443/*
4444 * Skips over white space at *buf, and updates *buf to point to the
4445 * first found non-space character (if any). Returns the length of
4446 * the token (string of non-white space characters) found.  Note
4447 * that *buf must be terminated with '\0'.
4448 */
4449static inline size_t next_token(const char **buf)
4450{
4451        /*
4452        * These are the characters that produce nonzero for
4453        * isspace() in the "C" and "POSIX" locales.
4454        */
4455        const char *spaces = " \f\n\r\t\v";
4456
4457        *buf += strspn(*buf, spaces);   /* Find start of token */
4458
4459        return strcspn(*buf, spaces);   /* Return token length */
4460}
4461
4462/*
4463 * Finds the next token in *buf, and if the provided token buffer is
4464 * big enough, copies the found token into it.  The result, if
4465 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4466 * must be terminated with '\0' on entry.
4467 *
4468 * Returns the length of the token found (not including the '\0').
4469 * Return value will be 0 if no token is found, and it will be >=
4470 * token_size if the token would not fit.
4471 *
4472 * The *buf pointer will be updated to point beyond the end of the
4473 * found token.  Note that this occurs even if the token buffer is
4474 * too small to hold it.
4475 */
4476static inline size_t copy_token(const char **buf,
4477                                char *token,
4478                                size_t token_size)
4479{
4480        size_t len;
4481
4482        len = next_token(buf);
4483        if (len < token_size) {
4484                memcpy(token, *buf, len);
4485                *(token + len) = '\0';
4486        }
4487        *buf += len;
4488
4489        return len;
4490}
4491
4492/*
4493 * Finds the next token in *buf, dynamically allocates a buffer big
4494 * enough to hold a copy of it, and copies the token into the new
4495 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4496 * that a duplicate buffer is created even for a zero-length token.
4497 *
4498 * Returns a pointer to the newly-allocated duplicate, or a null
4499 * pointer if memory for the duplicate was not available.  If
4500 * the lenp argument is a non-null pointer, the length of the token
4501 * (not including the '\0') is returned in *lenp.
4502 *
4503 * If successful, the *buf pointer will be updated to point beyond
4504 * the end of the found token.
4505 *
4506 * Note: uses GFP_KERNEL for allocation.
4507 */
4508static inline char *dup_token(const char **buf, size_t *lenp)
4509{
4510        char *dup;
4511        size_t len;
4512
4513        len = next_token(buf);
4514        dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4515        if (!dup)
4516                return NULL;
4517        *(dup + len) = '\0';
4518        *buf += len;
4519
4520        if (lenp)
4521                *lenp = len;
4522
4523        return dup;
4524}
4525
4526/*
4527 * Parse the options provided for an "rbd add" (i.e., rbd image
4528 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4529 * and the data written is passed here via a NUL-terminated buffer.
4530 * Returns 0 if successful or an error code otherwise.
4531 *
4532 * The information extracted from these options is recorded in
4533 * the other parameters which return dynamically-allocated
4534 * structures:
4535 *  ceph_opts
4536 *      The address of a pointer that will refer to a ceph options
4537 *      structure.  Caller must release the returned pointer using
4538 *      ceph_destroy_options() when it is no longer needed.
4539 *  rbd_opts
4540 *      Address of an rbd options pointer.  Fully initialized by
4541 *      this function; caller must release with kfree().
4542 *  spec
4543 *      Address of an rbd image specification pointer.  Fully
4544 *      initialized by this function based on parsed options.
4545 *      Caller must release with rbd_spec_put().
4546 *
4547 * The options passed take this form:
4548 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4549 * where:
4550 *  <mon_addrs>
4551 *      A comma-separated list of one or more monitor addresses.
4552 *      A monitor address is an ip address, optionally followed
4553 *      by a port number (separated by a colon).
4554 *        I.e.:  ip1[:port1][,ip2[:port2]...]
4555 *  <options>
4556 *      A comma-separated list of ceph and/or rbd options.
4557 *  <pool_name>
4558 *      The name of the rados pool containing the rbd image.
4559 *  <image_name>
4560 *      The name of the image in that pool to map.
4561 *  <snap_id>
4562 *      An optional snapshot id.  If provided, the mapping will
4563 *      present data from the image at the time that snapshot was
4564 *      created.  The image head is used if no snapshot id is
4565 *      provided.  Snapshot mappings are always read-only.
4566 */
4567static int rbd_add_parse_args(const char *buf,
4568                                struct ceph_options **ceph_opts,
4569                                struct rbd_options **opts,
4570                                struct rbd_spec **rbd_spec)
4571{
4572        size_t len;
4573        char *options;
4574        const char *mon_addrs;
4575        char *snap_name;
4576        size_t mon_addrs_size;
4577        struct rbd_spec *spec = NULL;
4578        struct rbd_options *rbd_opts = NULL;
4579        struct ceph_options *copts;
4580        int ret;
4581
4582        /* The first four tokens are required */
4583
4584        len = next_token(&buf);
4585        if (!len) {
4586                rbd_warn(NULL, "no monitor address(es) provided");
4587                return -EINVAL;
4588        }
4589        mon_addrs = buf;
4590        mon_addrs_size = len + 1;
4591        buf += len;
4592
4593        ret = -EINVAL;
4594        options = dup_token(&buf, NULL);
4595        if (!options)
4596                return -ENOMEM;
4597        if (!*options) {
4598                rbd_warn(NULL, "no options provided");
4599                goto out_err;
4600        }
4601
4602        spec = rbd_spec_alloc();
4603        if (!spec)
4604                goto out_mem;
4605
4606        spec->pool_name = dup_token(&buf, NULL);
4607        if (!spec->pool_name)
4608                goto out_mem;
4609        if (!*spec->pool_name) {
4610                rbd_warn(NULL, "no pool name provided");
4611                goto out_err;
4612        }
4613
4614        spec->image_name = dup_token(&buf, NULL);
4615        if (!spec->image_name)
4616                goto out_mem;
4617        if (!*spec->image_name) {
4618                rbd_warn(NULL, "no image name provided");
4619                goto out_err;
4620        }
4621
4622        /*
4623         * Snapshot name is optional; default is to use "-"
4624         * (indicating the head/no snapshot).
4625         */
4626        len = next_token(&buf);
4627        if (!len) {
4628                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4629                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4630        } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4631                ret = -ENAMETOOLONG;
4632                goto out_err;
4633        }
4634        snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4635        if (!snap_name)
4636                goto out_mem;
4637        *(snap_name + len) = '\0';
4638        spec->snap_name = snap_name;
4639
4640        /* Initialize all rbd options to the defaults */
4641
4642        rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4643        if (!rbd_opts)
4644                goto out_mem;
4645
4646        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4647
4648        copts = ceph_parse_options(options, mon_addrs,
4649                                        mon_addrs + mon_addrs_size - 1,
4650                                        parse_rbd_opts_token, rbd_opts);
4651        if (IS_ERR(copts)) {
4652                ret = PTR_ERR(copts);
4653                goto out_err;
4654        }
4655        kfree(options);
4656
4657        *ceph_opts = copts;
4658        *opts = rbd_opts;
4659        *rbd_spec = spec;
4660
4661        return 0;
4662out_mem:
4663        ret = -ENOMEM;
4664out_err:
4665        kfree(rbd_opts);
4666        rbd_spec_put(spec);
4667        kfree(options);
4668
4669        return ret;
4670}
4671
4672/*
4673 * An rbd format 2 image has a unique identifier, distinct from the
4674 * name given to it by the user.  Internally, that identifier is
4675 * what's used to specify the names of objects related to the image.
4676 *
4677 * A special "rbd id" object is used to map an rbd image name to its
4678 * id.  If that object doesn't exist, then there is no v2 rbd image
4679 * with the supplied name.
4680 *
4681 * This function will record the given rbd_dev's image_id field if
4682 * it can be determined, and in that case will return 0.  If any
4683 * errors occur a negative errno will be returned and the rbd_dev's
4684 * image_id field will be unchanged (and should be NULL).
4685 */
4686static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4687{
4688        int ret;
4689        size_t size;
4690        char *object_name;
4691        void *response;
4692        char *image_id;
4693
4694        /*
4695         * When probing a parent image, the image id is already
4696         * known (and the image name likely is not).  There's no
4697         * need to fetch the image id again in this case.  We
4698         * do still need to set the image format though.
4699         */
4700        if (rbd_dev->spec->image_id) {
4701                rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4702
4703                return 0;
4704        }
4705
4706        /*
4707         * First, see if the format 2 image id file exists, and if
4708         * so, get the image's persistent id from it.
4709         */
4710        size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4711        object_name = kmalloc(size, GFP_NOIO);
4712        if (!object_name)
4713                return -ENOMEM;
4714        sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4715        dout("rbd id object name is %s\n", object_name);
4716
4717        /* Response will be an encoded string, which includes a length */
4718
4719        size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4720        response = kzalloc(size, GFP_NOIO);
4721        if (!response) {
4722                ret = -ENOMEM;
4723                goto out;
4724        }
4725
4726        /* If it doesn't exist we'll assume it's a format 1 image */
4727
4728        ret = rbd_obj_method_sync(rbd_dev, object_name,
4729                                "rbd", "get_id", NULL, 0,
4730                                response, RBD_IMAGE_ID_LEN_MAX);
4731        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4732        if (ret == -ENOENT) {
4733                image_id = kstrdup("", GFP_KERNEL);
4734                ret = image_id ? 0 : -ENOMEM;
4735                if (!ret)
4736                        rbd_dev->image_format = 1;
4737        } else if (ret > sizeof (__le32)) {
4738                void *p = response;
4739
4740                image_id = ceph_extract_encoded_string(&p, p + ret,
4741                                                NULL, GFP_NOIO);
4742                ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4743                if (!ret)
4744                        rbd_dev->image_format = 2;
4745        } else {
4746                ret = -EINVAL;
4747        }
4748
4749        if (!ret) {
4750                rbd_dev->spec->image_id = image_id;
4751                dout("image_id is %s\n", image_id);
4752        }
4753out:
4754        kfree(response);
4755        kfree(object_name);
4756
4757        return ret;
4758}
4759
4760/*
4761 * Undo whatever state changes are made by v1 or v2 header info
4762 * call.
4763 */
4764static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4765{
4766        struct rbd_image_header *header;
4767
4768        /* Drop parent reference unless it's already been done (or none) */
4769
4770        if (rbd_dev->parent_overlap)
4771                rbd_dev_parent_put(rbd_dev);
4772
4773        /* Free dynamic fields from the header, then zero it out */
4774
4775        header = &rbd_dev->header;
4776        ceph_put_snap_context(header->snapc);
4777        kfree(header->snap_sizes);
4778        kfree(header->snap_names);
4779        kfree(header->object_prefix);
4780        memset(header, 0, sizeof (*header));
4781}
4782
4783static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4784{
4785        int ret;
4786
4787        ret = rbd_dev_v2_object_prefix(rbd_dev);
4788        if (ret)
4789                goto out_err;
4790
4791        /*
4792         * Get the and check features for the image.  Currently the
4793         * features are assumed to never change.
4794         */
4795        ret = rbd_dev_v2_features(rbd_dev);
4796        if (ret)
4797                goto out_err;
4798
4799        /* If the image supports fancy striping, get its parameters */
4800
4801        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4802                ret = rbd_dev_v2_striping_info(rbd_dev);
4803                if (ret < 0)
4804                        goto out_err;
4805        }
4806        /* No support for crypto and compression type format 2 images */
4807
4808        return 0;
4809out_err:
4810        rbd_dev->header.features = 0;
4811        kfree(rbd_dev->header.object_prefix);
4812        rbd_dev->header.object_prefix = NULL;
4813
4814        return ret;
4815}
4816
4817static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4818{
4819        struct rbd_device *parent = NULL;
4820        struct rbd_spec *parent_spec;
4821        struct rbd_client *rbdc;
4822        int ret;
4823
4824        if (!rbd_dev->parent_spec)
4825                return 0;
4826        /*
4827         * We need to pass a reference to the client and the parent
4828         * spec when creating the parent rbd_dev.  Images related by
4829         * parent/child relationships always share both.
4830         */
4831        parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4832        rbdc = __rbd_get_client(rbd_dev->rbd_client);
4833
4834        ret = -ENOMEM;
4835        parent = rbd_dev_create(rbdc, parent_spec);
4836        if (!parent)
4837                goto out_err;
4838
4839        ret = rbd_dev_image_probe(parent, false);
4840        if (ret < 0)
4841                goto out_err;
4842        rbd_dev->parent = parent;
4843        atomic_set(&rbd_dev->parent_ref, 1);
4844
4845        return 0;
4846out_err:
4847        if (parent) {
4848                rbd_dev_unparent(rbd_dev);
4849                kfree(rbd_dev->header_name);
4850                rbd_dev_destroy(parent);
4851        } else {
4852                rbd_put_client(rbdc);
4853                rbd_spec_put(parent_spec);
4854        }
4855
4856        return ret;
4857}
4858
4859static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4860{
4861        int ret;
4862
4863        /* generate unique id: find highest unique id, add one */
4864        rbd_dev_id_get(rbd_dev);
4865
4866        /* Fill in the device name, now that we have its id. */
4867        BUILD_BUG_ON(DEV_NAME_LEN
4868                        < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4869        sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4870
4871        /* Get our block major device number. */
4872
4873        ret = register_blkdev(0, rbd_dev->name);
4874        if (ret < 0)
4875                goto err_out_id;
4876        rbd_dev->major = ret;
4877
4878        /* Set up the blkdev mapping. */
4879
4880        ret = rbd_init_disk(rbd_dev);
4881        if (ret)
4882                goto err_out_blkdev;
4883
4884        ret = rbd_dev_mapping_set(rbd_dev);
4885        if (ret)
4886                goto err_out_disk;
4887        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4888
4889        ret = rbd_bus_add_dev(rbd_dev);
4890        if (ret)
4891                goto err_out_mapping;
4892
4893        /* Everything's ready.  Announce the disk to the world. */
4894
4895        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4896        add_disk(rbd_dev->disk);
4897
4898        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4899                (unsigned long long) rbd_dev->mapping.size);
4900
4901        return ret;
4902
4903err_out_mapping:
4904        rbd_dev_mapping_clear(rbd_dev);
4905err_out_disk:
4906        rbd_free_disk(rbd_dev);
4907err_out_blkdev:
4908        unregister_blkdev(rbd_dev->major, rbd_dev->name);
4909err_out_id:
4910        rbd_dev_id_put(rbd_dev);
4911        rbd_dev_mapping_clear(rbd_dev);
4912
4913        return ret;
4914}
4915
4916static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4917{
4918        struct rbd_spec *spec = rbd_dev->spec;
4919        size_t size;
4920
4921        /* Record the header object name for this rbd image. */
4922
4923        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4924
4925        if (rbd_dev->image_format == 1)
4926                size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4927        else
4928                size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4929
4930        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4931        if (!rbd_dev->header_name)
4932                return -ENOMEM;
4933
4934        if (rbd_dev->image_format == 1)
4935                sprintf(rbd_dev->header_name, "%s%s",
4936                        spec->image_name, RBD_SUFFIX);
4937        else
4938                sprintf(rbd_dev->header_name, "%s%s",
4939                        RBD_HEADER_PREFIX, spec->image_id);
4940        return 0;
4941}
4942
4943static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4944{
4945        rbd_dev_unprobe(rbd_dev);
4946        kfree(rbd_dev->header_name);
4947        rbd_dev->header_name = NULL;
4948        rbd_dev->image_format = 0;
4949        kfree(rbd_dev->spec->image_id);
4950        rbd_dev->spec->image_id = NULL;
4951
4952        rbd_dev_destroy(rbd_dev);
4953}
4954
4955/*
4956 * Probe for the existence of the header object for the given rbd
4957 * device.  If this image is the one being mapped (i.e., not a
4958 * parent), initiate a watch on its header object before using that
4959 * object to get detailed information about the rbd image.
4960 */
4961static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4962{
4963        int ret;
4964        int tmp;
4965
4966        /*
4967         * Get the id from the image id object.  Unless there's an
4968         * error, rbd_dev->spec->image_id will be filled in with
4969         * a dynamically-allocated string, and rbd_dev->image_format
4970         * will be set to either 1 or 2.
4971         */
4972        ret = rbd_dev_image_id(rbd_dev);
4973        if (ret)
4974                return ret;
4975        rbd_assert(rbd_dev->spec->image_id);
4976        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4977
4978        ret = rbd_dev_header_name(rbd_dev);
4979        if (ret)
4980                goto err_out_format;
4981
4982        if (mapping) {
4983                ret = rbd_dev_header_watch_sync(rbd_dev, true);
4984                if (ret)
4985                        goto out_header_name;
4986        }
4987
4988        if (rbd_dev->image_format == 1)
4989                ret = rbd_dev_v1_header_info(rbd_dev);
4990        else
4991                ret = rbd_dev_v2_header_info(rbd_dev);
4992        if (ret)
4993                goto err_out_watch;
4994
4995        ret = rbd_dev_spec_update(rbd_dev);
4996        if (ret)
4997                goto err_out_probe;
4998
4999        ret = rbd_dev_probe_parent(rbd_dev);
5000        if (ret)
5001                goto err_out_probe;
5002
5003        dout("discovered format %u image, header name is %s\n",
5004                rbd_dev->image_format, rbd_dev->header_name);
5005
5006        return 0;
5007err_out_probe:
5008        rbd_dev_unprobe(rbd_dev);
5009err_out_watch:
5010        if (mapping) {
5011                tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5012                if (tmp)
5013                        rbd_warn(rbd_dev, "unable to tear down "
5014                                        "watch request (%d)\n", tmp);
5015        }
5016out_header_name:
5017        kfree(rbd_dev->header_name);
5018        rbd_dev->header_name = NULL;
5019err_out_format:
5020        rbd_dev->image_format = 0;
5021        kfree(rbd_dev->spec->image_id);
5022        rbd_dev->spec->image_id = NULL;
5023
5024        dout("probe failed, returning %d\n", ret);
5025
5026        return ret;
5027}
5028
5029static ssize_t rbd_add(struct bus_type *bus,
5030                       const char *buf,
5031                       size_t count)
5032{
5033        struct rbd_device *rbd_dev = NULL;
5034        struct ceph_options *ceph_opts = NULL;
5035        struct rbd_options *rbd_opts = NULL;
5036        struct rbd_spec *spec = NULL;
5037        struct rbd_client *rbdc;
5038        struct ceph_osd_client *osdc;
5039        bool read_only;
5040        int rc = -ENOMEM;
5041
5042        if (!try_module_get(THIS_MODULE))
5043                return -ENODEV;
5044
5045        /* parse add command */
5046        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5047        if (rc < 0)
5048                goto err_out_module;
5049        read_only = rbd_opts->read_only;
5050        kfree(rbd_opts);
5051        rbd_opts = NULL;        /* done with this */
5052
5053        rbdc = rbd_get_client(ceph_opts);
5054        if (IS_ERR(rbdc)) {
5055                rc = PTR_ERR(rbdc);
5056                goto err_out_args;
5057        }
5058
5059        /* pick the pool */
5060        osdc = &rbdc->client->osdc;
5061        rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5062        if (rc < 0)
5063                goto err_out_client;
5064        spec->pool_id = (u64)rc;
5065
5066        /* The ceph file layout needs to fit pool id in 32 bits */
5067
5068        if (spec->pool_id > (u64)U32_MAX) {
5069                rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5070                                (unsigned long long)spec->pool_id, U32_MAX);
5071                rc = -EIO;
5072                goto err_out_client;
5073        }
5074
5075        rbd_dev = rbd_dev_create(rbdc, spec);
5076        if (!rbd_dev)
5077                goto err_out_client;
5078        rbdc = NULL;            /* rbd_dev now owns this */
5079        spec = NULL;            /* rbd_dev now owns this */
5080
5081        rc = rbd_dev_image_probe(rbd_dev, true);
5082        if (rc < 0)
5083                goto err_out_rbd_dev;
5084
5085        /* If we are mapping a snapshot it must be marked read-only */
5086
5087        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5088                read_only = true;
5089        rbd_dev->mapping.read_only = read_only;
5090
5091        rc = rbd_dev_device_setup(rbd_dev);
5092        if (rc) {
5093                rbd_dev_image_release(rbd_dev);
5094                goto err_out_module;
5095        }
5096
5097        return count;
5098
5099err_out_rbd_dev:
5100        rbd_dev_destroy(rbd_dev);
5101err_out_client:
5102        rbd_put_client(rbdc);
5103err_out_args:
5104        rbd_spec_put(spec);
5105err_out_module:
5106        module_put(THIS_MODULE);
5107
5108        dout("Error adding device %s\n", buf);
5109
5110        return (ssize_t)rc;
5111}
5112
5113static void rbd_dev_device_release(struct device *dev)
5114{
5115        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5116
5117        rbd_free_disk(rbd_dev);
5118        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5119        rbd_dev_mapping_clear(rbd_dev);
5120        unregister_blkdev(rbd_dev->major, rbd_dev->name);
5121        rbd_dev->major = 0;
5122        rbd_dev_id_put(rbd_dev);
5123        rbd_dev_mapping_clear(rbd_dev);
5124}
5125
5126static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5127{
5128        while (rbd_dev->parent) {
5129                struct rbd_device *first = rbd_dev;
5130                struct rbd_device *second = first->parent;
5131                struct rbd_device *third;
5132
5133                /*
5134                 * Follow to the parent with no grandparent and
5135                 * remove it.
5136                 */
5137                while (second && (third = second->parent)) {
5138                        first = second;
5139                        second = third;
5140                }
5141                rbd_assert(second);
5142                rbd_dev_image_release(second);
5143                first->parent = NULL;
5144                first->parent_overlap = 0;
5145
5146                rbd_assert(first->parent_spec);
5147                rbd_spec_put(first->parent_spec);
5148                first->parent_spec = NULL;
5149        }
5150}
5151
5152static ssize_t rbd_remove(struct bus_type *bus,
5153                          const char *buf,
5154                          size_t count)
5155{
5156        struct rbd_device *rbd_dev = NULL;
5157        struct list_head *tmp;
5158        int dev_id;
5159        unsigned long ul;
5160        bool already = false;
5161        int ret;
5162
5163        ret = kstrtoul(buf, 10, &ul);
5164        if (ret)
5165                return ret;
5166
5167        /* convert to int; abort if we lost anything in the conversion */
5168        dev_id = (int)ul;
5169        if (dev_id != ul)
5170                return -EINVAL;
5171
5172        ret = -ENOENT;
5173        spin_lock(&rbd_dev_list_lock);
5174        list_for_each(tmp, &rbd_dev_list) {
5175                rbd_dev = list_entry(tmp, struct rbd_device, node);
5176                if (rbd_dev->dev_id == dev_id) {
5177                        ret = 0;
5178                        break;
5179                }
5180        }
5181        if (!ret) {
5182                spin_lock_irq(&rbd_dev->lock);
5183                if (rbd_dev->open_count)
5184                        ret = -EBUSY;
5185                else
5186                        already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5187                                                        &rbd_dev->flags);
5188                spin_unlock_irq(&rbd_dev->lock);
5189        }
5190        spin_unlock(&rbd_dev_list_lock);
5191        if (ret < 0 || already)
5192                return ret;
5193
5194        ret = rbd_dev_header_watch_sync(rbd_dev, false);
5195        if (ret)
5196                rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5197
5198        /*
5199         * flush remaining watch callbacks - these must be complete
5200         * before the osd_client is shutdown
5201         */
5202        dout("%s: flushing notifies", __func__);
5203        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5204        /*
5205         * Don't free anything from rbd_dev->disk until after all
5206         * notifies are completely processed. Otherwise
5207         * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5208         * in a potential use after free of rbd_dev->disk or rbd_dev.
5209         */
5210        rbd_bus_del_dev(rbd_dev);
5211        rbd_dev_image_release(rbd_dev);
5212        module_put(THIS_MODULE);
5213
5214        return count;
5215}
5216
5217/*
5218 * create control files in sysfs
5219 * /sys/bus/rbd/...
5220 */
5221static int rbd_sysfs_init(void)
5222{
5223        int ret;
5224
5225        ret = device_register(&rbd_root_dev);
5226        if (ret < 0)
5227                return ret;
5228
5229        ret = bus_register(&rbd_bus_type);
5230        if (ret < 0)
5231                device_unregister(&rbd_root_dev);
5232
5233        return ret;
5234}
5235
5236static void rbd_sysfs_cleanup(void)
5237{
5238        bus_unregister(&rbd_bus_type);
5239        device_unregister(&rbd_root_dev);
5240}
5241
5242static int rbd_slab_init(void)
5243{
5244        rbd_assert(!rbd_img_request_cache);
5245        rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5246                                        sizeof (struct rbd_img_request),
5247                                        __alignof__(struct rbd_img_request),
5248                                        0, NULL);
5249        if (!rbd_img_request_cache)
5250                return -ENOMEM;
5251
5252        rbd_assert(!rbd_obj_request_cache);
5253        rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5254                                        sizeof (struct rbd_obj_request),
5255                                        __alignof__(struct rbd_obj_request),
5256                                        0, NULL);
5257        if (!rbd_obj_request_cache)
5258                goto out_err;
5259
5260        rbd_assert(!rbd_segment_name_cache);
5261        rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5262                                        MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5263        if (rbd_segment_name_cache)
5264                return 0;
5265out_err:
5266        if (rbd_obj_request_cache) {
5267                kmem_cache_destroy(rbd_obj_request_cache);
5268                rbd_obj_request_cache = NULL;
5269        }
5270
5271        kmem_cache_destroy(rbd_img_request_cache);
5272        rbd_img_request_cache = NULL;
5273
5274        return -ENOMEM;
5275}
5276
5277static void rbd_slab_exit(void)
5278{
5279        rbd_assert(rbd_segment_name_cache);
5280        kmem_cache_destroy(rbd_segment_name_cache);
5281        rbd_segment_name_cache = NULL;
5282
5283        rbd_assert(rbd_obj_request_cache);
5284        kmem_cache_destroy(rbd_obj_request_cache);
5285        rbd_obj_request_cache = NULL;
5286
5287        rbd_assert(rbd_img_request_cache);
5288        kmem_cache_destroy(rbd_img_request_cache);
5289        rbd_img_request_cache = NULL;
5290}
5291
5292static int __init rbd_init(void)
5293{
5294        int rc;
5295
5296        if (!libceph_compatible(NULL)) {
5297                rbd_warn(NULL, "libceph incompatibility (quitting)");
5298
5299                return -EINVAL;
5300        }
5301        rc = rbd_slab_init();
5302        if (rc)
5303                return rc;
5304        rc = rbd_sysfs_init();
5305        if (rc)
5306                rbd_slab_exit();
5307        else
5308                pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5309
5310        return rc;
5311}
5312
5313static void __exit rbd_exit(void)
5314{
5315        rbd_sysfs_cleanup();
5316        rbd_slab_exit();
5317}
5318
5319module_init(rbd_init);
5320module_exit(rbd_exit);
5321
5322MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5323MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5324MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5325MODULE_DESCRIPTION("rados block device");
5326
5327/* following authorship retained from original osdblk.c */
5328MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5329
5330MODULE_LICENSE("GPL");
5331