linux/drivers/block/rbd.c
<<
>>
Prefs
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34#include <linux/parser.h>
  35
  36#include <linux/kernel.h>
  37#include <linux/device.h>
  38#include <linux/module.h>
  39#include <linux/fs.h>
  40#include <linux/blkdev.h>
  41
  42#include "rbd_types.h"
  43
  44/*
  45 * The basic unit of block I/O is a sector.  It is interpreted in a
  46 * number of contexts in Linux (blk, bio, genhd), but the default is
  47 * universally 512 bytes.  These symbols are just slightly more
  48 * meaningful than the bare numbers they represent.
  49 */
  50#define SECTOR_SHIFT    9
  51#define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
  52
  53#define RBD_DRV_NAME "rbd"
  54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
  55
  56#define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
  57
  58#define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
  59#define RBD_MAX_POOL_NAME_LEN   64
  60#define RBD_MAX_SNAP_NAME_LEN   32
  61#define RBD_MAX_OPT_LEN         1024
  62
  63#define RBD_SNAP_HEAD_NAME      "-"
  64
  65/*
  66 * An RBD device name will be "rbd#", where the "rbd" comes from
  67 * RBD_DRV_NAME above, and # is a unique integer identifier.
  68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
  69 * enough to hold all possible device names.
  70 */
  71#define DEV_NAME_LEN            32
  72#define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
  73
  74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
  75
  76/*
  77 * block device image metadata (in-memory version)
  78 */
  79struct rbd_image_header {
  80        u64 image_size;
  81        char block_name[32];
  82        __u8 obj_order;
  83        __u8 crypt_type;
  84        __u8 comp_type;
  85        struct ceph_snap_context *snapc;
  86        size_t snap_names_len;
  87        u64 snap_seq;
  88        u32 total_snaps;
  89
  90        char *snap_names;
  91        u64 *snap_sizes;
  92
  93        u64 obj_version;
  94};
  95
  96struct rbd_options {
  97        int     notify_timeout;
  98};
  99
 100/*
 101 * an instance of the client.  multiple devices may share an rbd client.
 102 */
 103struct rbd_client {
 104        struct ceph_client      *client;
 105        struct rbd_options      *rbd_opts;
 106        struct kref             kref;
 107        struct list_head        node;
 108};
 109
 110/*
 111 * a request completion status
 112 */
 113struct rbd_req_status {
 114        int done;
 115        int rc;
 116        u64 bytes;
 117};
 118
 119/*
 120 * a collection of requests
 121 */
 122struct rbd_req_coll {
 123        int                     total;
 124        int                     num_done;
 125        struct kref             kref;
 126        struct rbd_req_status   status[0];
 127};
 128
 129/*
 130 * a single io request
 131 */
 132struct rbd_request {
 133        struct request          *rq;            /* blk layer request */
 134        struct bio              *bio;           /* cloned bio */
 135        struct page             **pages;        /* list of used pages */
 136        u64                     len;
 137        int                     coll_index;
 138        struct rbd_req_coll     *coll;
 139};
 140
 141struct rbd_snap {
 142        struct  device          dev;
 143        const char              *name;
 144        u64                     size;
 145        struct list_head        node;
 146        u64                     id;
 147};
 148
 149/*
 150 * a single device
 151 */
 152struct rbd_device {
 153        int                     id;             /* blkdev unique id */
 154
 155        int                     major;          /* blkdev assigned major */
 156        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 157        struct request_queue    *q;
 158
 159        struct rbd_client       *rbd_client;
 160
 161        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 162
 163        spinlock_t              lock;           /* queue lock */
 164
 165        struct rbd_image_header header;
 166        char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 167        int                     obj_len;
 168        char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 169        char                    pool_name[RBD_MAX_POOL_NAME_LEN];
 170        int                     poolid;
 171
 172        struct ceph_osd_event   *watch_event;
 173        struct ceph_osd_request *watch_request;
 174
 175        /* protects updating the header */
 176        struct rw_semaphore     header_rwsem;
 177        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 178        u64                     snap_id;        /* current snapshot id */
 179        int read_only;
 180
 181        struct list_head        node;
 182
 183        /* list of snapshots */
 184        struct list_head        snaps;
 185
 186        /* sysfs related */
 187        struct device           dev;
 188};
 189
 190static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
 191
 192static LIST_HEAD(rbd_dev_list);    /* devices */
 193static DEFINE_SPINLOCK(rbd_dev_list_lock);
 194
 195static LIST_HEAD(rbd_client_list);              /* clients */
 196static DEFINE_SPINLOCK(rbd_client_list_lock);
 197
 198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 199static void rbd_dev_release(struct device *dev);
 200static ssize_t rbd_snap_add(struct device *dev,
 201                            struct device_attribute *attr,
 202                            const char *buf,
 203                            size_t count);
 204static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 205                                  struct rbd_snap *snap);
 206
 207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 208                       size_t count);
 209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 210                          size_t count);
 211
 212static struct bus_attribute rbd_bus_attrs[] = {
 213        __ATTR(add, S_IWUSR, NULL, rbd_add),
 214        __ATTR(remove, S_IWUSR, NULL, rbd_remove),
 215        __ATTR_NULL
 216};
 217
 218static struct bus_type rbd_bus_type = {
 219        .name           = "rbd",
 220        .bus_attrs      = rbd_bus_attrs,
 221};
 222
 223static void rbd_root_dev_release(struct device *dev)
 224{
 225}
 226
 227static struct device rbd_root_dev = {
 228        .init_name =    "rbd",
 229        .release =      rbd_root_dev_release,
 230};
 231
 232
 233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 234{
 235        return get_device(&rbd_dev->dev);
 236}
 237
 238static void rbd_put_dev(struct rbd_device *rbd_dev)
 239{
 240        put_device(&rbd_dev->dev);
 241}
 242
 243static int __rbd_refresh_header(struct rbd_device *rbd_dev);
 244
 245static int rbd_open(struct block_device *bdev, fmode_t mode)
 246{
 247        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 248
 249        rbd_get_dev(rbd_dev);
 250
 251        set_device_ro(bdev, rbd_dev->read_only);
 252
 253        if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 254                return -EROFS;
 255
 256        return 0;
 257}
 258
 259static int rbd_release(struct gendisk *disk, fmode_t mode)
 260{
 261        struct rbd_device *rbd_dev = disk->private_data;
 262
 263        rbd_put_dev(rbd_dev);
 264
 265        return 0;
 266}
 267
 268static const struct block_device_operations rbd_bd_ops = {
 269        .owner                  = THIS_MODULE,
 270        .open                   = rbd_open,
 271        .release                = rbd_release,
 272};
 273
 274/*
 275 * Initialize an rbd client instance.
 276 * We own *opt.
 277 */
 278static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 279                                            struct rbd_options *rbd_opts)
 280{
 281        struct rbd_client *rbdc;
 282        int ret = -ENOMEM;
 283
 284        dout("rbd_client_create\n");
 285        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 286        if (!rbdc)
 287                goto out_opt;
 288
 289        kref_init(&rbdc->kref);
 290        INIT_LIST_HEAD(&rbdc->node);
 291
 292        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 293
 294        rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
 295        if (IS_ERR(rbdc->client))
 296                goto out_mutex;
 297        opt = NULL; /* Now rbdc->client is responsible for opt */
 298
 299        ret = ceph_open_session(rbdc->client);
 300        if (ret < 0)
 301                goto out_err;
 302
 303        rbdc->rbd_opts = rbd_opts;
 304
 305        spin_lock(&rbd_client_list_lock);
 306        list_add_tail(&rbdc->node, &rbd_client_list);
 307        spin_unlock(&rbd_client_list_lock);
 308
 309        mutex_unlock(&ctl_mutex);
 310
 311        dout("rbd_client_create created %p\n", rbdc);
 312        return rbdc;
 313
 314out_err:
 315        ceph_destroy_client(rbdc->client);
 316out_mutex:
 317        mutex_unlock(&ctl_mutex);
 318        kfree(rbdc);
 319out_opt:
 320        if (opt)
 321                ceph_destroy_options(opt);
 322        return ERR_PTR(ret);
 323}
 324
 325/*
 326 * Find a ceph client with specific addr and configuration.
 327 */
 328static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 329{
 330        struct rbd_client *client_node;
 331
 332        if (opt->flags & CEPH_OPT_NOSHARE)
 333                return NULL;
 334
 335        list_for_each_entry(client_node, &rbd_client_list, node)
 336                if (ceph_compare_options(opt, client_node->client) == 0)
 337                        return client_node;
 338        return NULL;
 339}
 340
 341/*
 342 * mount options
 343 */
 344enum {
 345        Opt_notify_timeout,
 346        Opt_last_int,
 347        /* int args above */
 348        Opt_last_string,
 349        /* string args above */
 350};
 351
 352static match_table_t rbdopt_tokens = {
 353        {Opt_notify_timeout, "notify_timeout=%d"},
 354        /* int args above */
 355        /* string args above */
 356        {-1, NULL}
 357};
 358
 359static int parse_rbd_opts_token(char *c, void *private)
 360{
 361        struct rbd_options *rbdopt = private;
 362        substring_t argstr[MAX_OPT_ARGS];
 363        int token, intval, ret;
 364
 365        token = match_token(c, rbdopt_tokens, argstr);
 366        if (token < 0)
 367                return -EINVAL;
 368
 369        if (token < Opt_last_int) {
 370                ret = match_int(&argstr[0], &intval);
 371                if (ret < 0) {
 372                        pr_err("bad mount option arg (not int) "
 373                               "at '%s'\n", c);
 374                        return ret;
 375                }
 376                dout("got int token %d val %d\n", token, intval);
 377        } else if (token > Opt_last_int && token < Opt_last_string) {
 378                dout("got string token %d val %s\n", token,
 379                     argstr[0].from);
 380        } else {
 381                dout("got token %d\n", token);
 382        }
 383
 384        switch (token) {
 385        case Opt_notify_timeout:
 386                rbdopt->notify_timeout = intval;
 387                break;
 388        default:
 389                BUG_ON(token);
 390        }
 391        return 0;
 392}
 393
 394/*
 395 * Get a ceph client with specific addr and configuration, if one does
 396 * not exist create it.
 397 */
 398static struct rbd_client *rbd_get_client(const char *mon_addr,
 399                                         size_t mon_addr_len,
 400                                         char *options)
 401{
 402        struct rbd_client *rbdc;
 403        struct ceph_options *opt;
 404        struct rbd_options *rbd_opts;
 405
 406        rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
 407        if (!rbd_opts)
 408                return ERR_PTR(-ENOMEM);
 409
 410        rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 411
 412        opt = ceph_parse_options(options, mon_addr,
 413                                mon_addr + mon_addr_len,
 414                                parse_rbd_opts_token, rbd_opts);
 415        if (IS_ERR(opt)) {
 416                kfree(rbd_opts);
 417                return ERR_CAST(opt);
 418        }
 419
 420        spin_lock(&rbd_client_list_lock);
 421        rbdc = __rbd_client_find(opt);
 422        if (rbdc) {
 423                /* using an existing client */
 424                kref_get(&rbdc->kref);
 425                spin_unlock(&rbd_client_list_lock);
 426
 427                ceph_destroy_options(opt);
 428                kfree(rbd_opts);
 429
 430                return rbdc;
 431        }
 432        spin_unlock(&rbd_client_list_lock);
 433
 434        rbdc = rbd_client_create(opt, rbd_opts);
 435
 436        if (IS_ERR(rbdc))
 437                kfree(rbd_opts);
 438
 439        return rbdc;
 440}
 441
 442/*
 443 * Destroy ceph client
 444 *
 445 * Caller must hold rbd_client_list_lock.
 446 */
 447static void rbd_client_release(struct kref *kref)
 448{
 449        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 450
 451        dout("rbd_release_client %p\n", rbdc);
 452        spin_lock(&rbd_client_list_lock);
 453        list_del(&rbdc->node);
 454        spin_unlock(&rbd_client_list_lock);
 455
 456        ceph_destroy_client(rbdc->client);
 457        kfree(rbdc->rbd_opts);
 458        kfree(rbdc);
 459}
 460
 461/*
 462 * Drop reference to ceph client node. If it's not referenced anymore, release
 463 * it.
 464 */
 465static void rbd_put_client(struct rbd_device *rbd_dev)
 466{
 467        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 468        rbd_dev->rbd_client = NULL;
 469}
 470
 471/*
 472 * Destroy requests collection
 473 */
 474static void rbd_coll_release(struct kref *kref)
 475{
 476        struct rbd_req_coll *coll =
 477                container_of(kref, struct rbd_req_coll, kref);
 478
 479        dout("rbd_coll_release %p\n", coll);
 480        kfree(coll);
 481}
 482
 483/*
 484 * Create a new header structure, translate header format from the on-disk
 485 * header.
 486 */
 487static int rbd_header_from_disk(struct rbd_image_header *header,
 488                                 struct rbd_image_header_ondisk *ondisk,
 489                                 u32 allocated_snaps,
 490                                 gfp_t gfp_flags)
 491{
 492        u32 i, snap_count;
 493
 494        if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
 495                return -ENXIO;
 496
 497        snap_count = le32_to_cpu(ondisk->snap_count);
 498        if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
 499                         / sizeof (*ondisk))
 500                return -EINVAL;
 501        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 502                                snap_count * sizeof(u64),
 503                                gfp_flags);
 504        if (!header->snapc)
 505                return -ENOMEM;
 506
 507        header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 508        if (snap_count) {
 509                header->snap_names = kmalloc(header->snap_names_len,
 510                                             gfp_flags);
 511                if (!header->snap_names)
 512                        goto err_snapc;
 513                header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 514                                             gfp_flags);
 515                if (!header->snap_sizes)
 516                        goto err_names;
 517        } else {
 518                header->snap_names = NULL;
 519                header->snap_sizes = NULL;
 520        }
 521        memcpy(header->block_name, ondisk->block_name,
 522               sizeof(ondisk->block_name));
 523
 524        header->image_size = le64_to_cpu(ondisk->image_size);
 525        header->obj_order = ondisk->options.order;
 526        header->crypt_type = ondisk->options.crypt_type;
 527        header->comp_type = ondisk->options.comp_type;
 528
 529        atomic_set(&header->snapc->nref, 1);
 530        header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 531        header->snapc->num_snaps = snap_count;
 532        header->total_snaps = snap_count;
 533
 534        if (snap_count && allocated_snaps == snap_count) {
 535                for (i = 0; i < snap_count; i++) {
 536                        header->snapc->snaps[i] =
 537                                le64_to_cpu(ondisk->snaps[i].id);
 538                        header->snap_sizes[i] =
 539                                le64_to_cpu(ondisk->snaps[i].image_size);
 540                }
 541
 542                /* copy snapshot names */
 543                memcpy(header->snap_names, &ondisk->snaps[i],
 544                        header->snap_names_len);
 545        }
 546
 547        return 0;
 548
 549err_names:
 550        kfree(header->snap_names);
 551err_snapc:
 552        kfree(header->snapc);
 553        return -ENOMEM;
 554}
 555
 556static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 557                        u64 *seq, u64 *size)
 558{
 559        int i;
 560        char *p = header->snap_names;
 561
 562        for (i = 0; i < header->total_snaps; i++) {
 563                if (!strcmp(snap_name, p)) {
 564
 565                        /* Found it.  Pass back its id and/or size */
 566
 567                        if (seq)
 568                                *seq = header->snapc->snaps[i];
 569                        if (size)
 570                                *size = header->snap_sizes[i];
 571                        return i;
 572                }
 573                p += strlen(p) + 1;     /* Skip ahead to the next name */
 574        }
 575        return -ENOENT;
 576}
 577
 578static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
 579{
 580        struct rbd_image_header *header = &dev->header;
 581        struct ceph_snap_context *snapc = header->snapc;
 582        int ret = -ENOENT;
 583
 584        BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
 585
 586        down_write(&dev->header_rwsem);
 587
 588        if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
 589                    sizeof (RBD_SNAP_HEAD_NAME))) {
 590                if (header->total_snaps)
 591                        snapc->seq = header->snap_seq;
 592                else
 593                        snapc->seq = 0;
 594                dev->snap_id = CEPH_NOSNAP;
 595                dev->read_only = 0;
 596                if (size)
 597                        *size = header->image_size;
 598        } else {
 599                ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
 600                if (ret < 0)
 601                        goto done;
 602                dev->snap_id = snapc->seq;
 603                dev->read_only = 1;
 604        }
 605
 606        ret = 0;
 607done:
 608        up_write(&dev->header_rwsem);
 609        return ret;
 610}
 611
 612static void rbd_header_free(struct rbd_image_header *header)
 613{
 614        kfree(header->snapc);
 615        kfree(header->snap_names);
 616        kfree(header->snap_sizes);
 617}
 618
 619/*
 620 * get the actual striped segment name, offset and length
 621 */
 622static u64 rbd_get_segment(struct rbd_image_header *header,
 623                           const char *block_name,
 624                           u64 ofs, u64 len,
 625                           char *seg_name, u64 *segofs)
 626{
 627        u64 seg = ofs >> header->obj_order;
 628
 629        if (seg_name)
 630                snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 631                         "%s.%012llx", block_name, seg);
 632
 633        ofs = ofs & ((1 << header->obj_order) - 1);
 634        len = min_t(u64, len, (1 << header->obj_order) - ofs);
 635
 636        if (segofs)
 637                *segofs = ofs;
 638
 639        return len;
 640}
 641
 642static int rbd_get_num_segments(struct rbd_image_header *header,
 643                                u64 ofs, u64 len)
 644{
 645        u64 start_seg = ofs >> header->obj_order;
 646        u64 end_seg = (ofs + len - 1) >> header->obj_order;
 647        return end_seg - start_seg + 1;
 648}
 649
 650/*
 651 * returns the size of an object in the image
 652 */
 653static u64 rbd_obj_bytes(struct rbd_image_header *header)
 654{
 655        return 1 << header->obj_order;
 656}
 657
 658/*
 659 * bio helpers
 660 */
 661
 662static void bio_chain_put(struct bio *chain)
 663{
 664        struct bio *tmp;
 665
 666        while (chain) {
 667                tmp = chain;
 668                chain = chain->bi_next;
 669                bio_put(tmp);
 670        }
 671}
 672
 673/*
 674 * zeros a bio chain, starting at specific offset
 675 */
 676static void zero_bio_chain(struct bio *chain, int start_ofs)
 677{
 678        struct bio_vec *bv;
 679        unsigned long flags;
 680        void *buf;
 681        int i;
 682        int pos = 0;
 683
 684        while (chain) {
 685                bio_for_each_segment(bv, chain, i) {
 686                        if (pos + bv->bv_len > start_ofs) {
 687                                int remainder = max(start_ofs - pos, 0);
 688                                buf = bvec_kmap_irq(bv, &flags);
 689                                memset(buf + remainder, 0,
 690                                       bv->bv_len - remainder);
 691                                bvec_kunmap_irq(buf, &flags);
 692                        }
 693                        pos += bv->bv_len;
 694                }
 695
 696                chain = chain->bi_next;
 697        }
 698}
 699
 700/*
 701 * bio_chain_clone - clone a chain of bios up to a certain length.
 702 * might return a bio_pair that will need to be released.
 703 */
 704static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 705                                   struct bio_pair **bp,
 706                                   int len, gfp_t gfpmask)
 707{
 708        struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 709        int total = 0;
 710
 711        if (*bp) {
 712                bio_pair_release(*bp);
 713                *bp = NULL;
 714        }
 715
 716        while (old_chain && (total < len)) {
 717                tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 718                if (!tmp)
 719                        goto err_out;
 720
 721                if (total + old_chain->bi_size > len) {
 722                        struct bio_pair *bp;
 723
 724                        /*
 725                         * this split can only happen with a single paged bio,
 726                         * split_bio will BUG_ON if this is not the case
 727                         */
 728                        dout("bio_chain_clone split! total=%d remaining=%d"
 729                             "bi_size=%d\n",
 730                             (int)total, (int)len-total,
 731                             (int)old_chain->bi_size);
 732
 733                        /* split the bio. We'll release it either in the next
 734                           call, or it will have to be released outside */
 735                        bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
 736                        if (!bp)
 737                                goto err_out;
 738
 739                        __bio_clone(tmp, &bp->bio1);
 740
 741                        *next = &bp->bio2;
 742                } else {
 743                        __bio_clone(tmp, old_chain);
 744                        *next = old_chain->bi_next;
 745                }
 746
 747                tmp->bi_bdev = NULL;
 748                gfpmask &= ~__GFP_WAIT;
 749                tmp->bi_next = NULL;
 750
 751                if (!new_chain) {
 752                        new_chain = tail = tmp;
 753                } else {
 754                        tail->bi_next = tmp;
 755                        tail = tmp;
 756                }
 757                old_chain = old_chain->bi_next;
 758
 759                total += tmp->bi_size;
 760        }
 761
 762        BUG_ON(total < len);
 763
 764        if (tail)
 765                tail->bi_next = NULL;
 766
 767        *old = old_chain;
 768
 769        return new_chain;
 770
 771err_out:
 772        dout("bio_chain_clone with err\n");
 773        bio_chain_put(new_chain);
 774        return NULL;
 775}
 776
 777/*
 778 * helpers for osd request op vectors.
 779 */
 780static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 781                            int num_ops,
 782                            int opcode,
 783                            u32 payload_len)
 784{
 785        *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 786                       GFP_NOIO);
 787        if (!*ops)
 788                return -ENOMEM;
 789        (*ops)[0].op = opcode;
 790        /*
 791         * op extent offset and length will be set later on
 792         * in calc_raw_layout()
 793         */
 794        (*ops)[0].payload_len = payload_len;
 795        return 0;
 796}
 797
 798static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 799{
 800        kfree(ops);
 801}
 802
 803static void rbd_coll_end_req_index(struct request *rq,
 804                                   struct rbd_req_coll *coll,
 805                                   int index,
 806                                   int ret, u64 len)
 807{
 808        struct request_queue *q;
 809        int min, max, i;
 810
 811        dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
 812             coll, index, ret, len);
 813
 814        if (!rq)
 815                return;
 816
 817        if (!coll) {
 818                blk_end_request(rq, ret, len);
 819                return;
 820        }
 821
 822        q = rq->q;
 823
 824        spin_lock_irq(q->queue_lock);
 825        coll->status[index].done = 1;
 826        coll->status[index].rc = ret;
 827        coll->status[index].bytes = len;
 828        max = min = coll->num_done;
 829        while (max < coll->total && coll->status[max].done)
 830                max++;
 831
 832        for (i = min; i<max; i++) {
 833                __blk_end_request(rq, coll->status[i].rc,
 834                                  coll->status[i].bytes);
 835                coll->num_done++;
 836                kref_put(&coll->kref, rbd_coll_release);
 837        }
 838        spin_unlock_irq(q->queue_lock);
 839}
 840
 841static void rbd_coll_end_req(struct rbd_request *req,
 842                             int ret, u64 len)
 843{
 844        rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
 845}
 846
 847/*
 848 * Send ceph osd request
 849 */
 850static int rbd_do_request(struct request *rq,
 851                          struct rbd_device *dev,
 852                          struct ceph_snap_context *snapc,
 853                          u64 snapid,
 854                          const char *obj, u64 ofs, u64 len,
 855                          struct bio *bio,
 856                          struct page **pages,
 857                          int num_pages,
 858                          int flags,
 859                          struct ceph_osd_req_op *ops,
 860                          int num_reply,
 861                          struct rbd_req_coll *coll,
 862                          int coll_index,
 863                          void (*rbd_cb)(struct ceph_osd_request *req,
 864                                         struct ceph_msg *msg),
 865                          struct ceph_osd_request **linger_req,
 866                          u64 *ver)
 867{
 868        struct ceph_osd_request *req;
 869        struct ceph_file_layout *layout;
 870        int ret;
 871        u64 bno;
 872        struct timespec mtime = CURRENT_TIME;
 873        struct rbd_request *req_data;
 874        struct ceph_osd_request_head *reqhead;
 875        struct ceph_osd_client *osdc;
 876
 877        req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 878        if (!req_data) {
 879                if (coll)
 880                        rbd_coll_end_req_index(rq, coll, coll_index,
 881                                               -ENOMEM, len);
 882                return -ENOMEM;
 883        }
 884
 885        if (coll) {
 886                req_data->coll = coll;
 887                req_data->coll_index = coll_index;
 888        }
 889
 890        dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 891
 892        down_read(&dev->header_rwsem);
 893
 894        osdc = &dev->rbd_client->client->osdc;
 895        req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
 896                                        false, GFP_NOIO, pages, bio);
 897        if (!req) {
 898                up_read(&dev->header_rwsem);
 899                ret = -ENOMEM;
 900                goto done_pages;
 901        }
 902
 903        req->r_callback = rbd_cb;
 904
 905        req_data->rq = rq;
 906        req_data->bio = bio;
 907        req_data->pages = pages;
 908        req_data->len = len;
 909
 910        req->r_priv = req_data;
 911
 912        reqhead = req->r_request->front.iov_base;
 913        reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 914
 915        strncpy(req->r_oid, obj, sizeof(req->r_oid));
 916        req->r_oid_len = strlen(req->r_oid);
 917
 918        layout = &req->r_file_layout;
 919        memset(layout, 0, sizeof(*layout));
 920        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 921        layout->fl_stripe_count = cpu_to_le32(1);
 922        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 923        layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 924        ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 925                                req, ops);
 926
 927        ceph_osdc_build_request(req, ofs, &len,
 928                                ops,
 929                                snapc,
 930                                &mtime,
 931                                req->r_oid, req->r_oid_len);
 932        up_read(&dev->header_rwsem);
 933
 934        if (linger_req) {
 935                ceph_osdc_set_request_linger(osdc, req);
 936                *linger_req = req;
 937        }
 938
 939        ret = ceph_osdc_start_request(osdc, req, false);
 940        if (ret < 0)
 941                goto done_err;
 942
 943        if (!rbd_cb) {
 944                ret = ceph_osdc_wait_request(osdc, req);
 945                if (ver)
 946                        *ver = le64_to_cpu(req->r_reassert_version.version);
 947                dout("reassert_ver=%lld\n",
 948                     le64_to_cpu(req->r_reassert_version.version));
 949                ceph_osdc_put_request(req);
 950        }
 951        return ret;
 952
 953done_err:
 954        bio_chain_put(req_data->bio);
 955        ceph_osdc_put_request(req);
 956done_pages:
 957        rbd_coll_end_req(req_data, ret, len);
 958        kfree(req_data);
 959        return ret;
 960}
 961
 962/*
 963 * Ceph osd op callback
 964 */
 965static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 966{
 967        struct rbd_request *req_data = req->r_priv;
 968        struct ceph_osd_reply_head *replyhead;
 969        struct ceph_osd_op *op;
 970        __s32 rc;
 971        u64 bytes;
 972        int read_op;
 973
 974        /* parse reply */
 975        replyhead = msg->front.iov_base;
 976        WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 977        op = (void *)(replyhead + 1);
 978        rc = le32_to_cpu(replyhead->result);
 979        bytes = le64_to_cpu(op->extent.length);
 980        read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 981
 982        dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 983
 984        if (rc == -ENOENT && read_op) {
 985                zero_bio_chain(req_data->bio, 0);
 986                rc = 0;
 987        } else if (rc == 0 && read_op && bytes < req_data->len) {
 988                zero_bio_chain(req_data->bio, bytes);
 989                bytes = req_data->len;
 990        }
 991
 992        rbd_coll_end_req(req_data, rc, bytes);
 993
 994        if (req_data->bio)
 995                bio_chain_put(req_data->bio);
 996
 997        ceph_osdc_put_request(req);
 998        kfree(req_data);
 999}
1000
1001static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002{
1003        ceph_osdc_put_request(req);
1004}
1005
1006/*
1007 * Do a synchronous ceph osd operation
1008 */
1009static int rbd_req_sync_op(struct rbd_device *dev,
1010                           struct ceph_snap_context *snapc,
1011                           u64 snapid,
1012                           int opcode,
1013                           int flags,
1014                           struct ceph_osd_req_op *orig_ops,
1015                           int num_reply,
1016                           const char *obj,
1017                           u64 ofs, u64 len,
1018                           char *buf,
1019                           struct ceph_osd_request **linger_req,
1020                           u64 *ver)
1021{
1022        int ret;
1023        struct page **pages;
1024        int num_pages;
1025        struct ceph_osd_req_op *ops = orig_ops;
1026        u32 payload_len;
1027
1028        num_pages = calc_pages_for(ofs , len);
1029        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030        if (IS_ERR(pages))
1031                return PTR_ERR(pages);
1032
1033        if (!orig_ops) {
1034                payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1035                ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036                if (ret < 0)
1037                        goto done;
1038
1039                if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040                        ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041                        if (ret < 0)
1042                                goto done_ops;
1043                }
1044        }
1045
1046        ret = rbd_do_request(NULL, dev, snapc, snapid,
1047                          obj, ofs, len, NULL,
1048                          pages, num_pages,
1049                          flags,
1050                          ops,
1051                          2,
1052                          NULL, 0,
1053                          NULL,
1054                          linger_req, ver);
1055        if (ret < 0)
1056                goto done_ops;
1057
1058        if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059                ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060
1061done_ops:
1062        if (!orig_ops)
1063                rbd_destroy_ops(ops);
1064done:
1065        ceph_release_page_vector(pages, num_pages);
1066        return ret;
1067}
1068
1069/*
1070 * Do an asynchronous ceph osd operation
1071 */
1072static int rbd_do_op(struct request *rq,
1073                     struct rbd_device *rbd_dev ,
1074                     struct ceph_snap_context *snapc,
1075                     u64 snapid,
1076                     int opcode, int flags, int num_reply,
1077                     u64 ofs, u64 len,
1078                     struct bio *bio,
1079                     struct rbd_req_coll *coll,
1080                     int coll_index)
1081{
1082        char *seg_name;
1083        u64 seg_ofs;
1084        u64 seg_len;
1085        int ret;
1086        struct ceph_osd_req_op *ops;
1087        u32 payload_len;
1088
1089        seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090        if (!seg_name)
1091                return -ENOMEM;
1092
1093        seg_len = rbd_get_segment(&rbd_dev->header,
1094                                  rbd_dev->header.block_name,
1095                                  ofs, len,
1096                                  seg_name, &seg_ofs);
1097
1098        payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100        ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1101        if (ret < 0)
1102                goto done;
1103
1104        /* we've taken care of segment sizes earlier when we
1105           cloned the bios. We should never have a segment
1106           truncated at this point */
1107        BUG_ON(seg_len < len);
1108
1109        ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110                             seg_name, seg_ofs, seg_len,
1111                             bio,
1112                             NULL, 0,
1113                             flags,
1114                             ops,
1115                             num_reply,
1116                             coll, coll_index,
1117                             rbd_req_cb, 0, NULL);
1118
1119        rbd_destroy_ops(ops);
1120done:
1121        kfree(seg_name);
1122        return ret;
1123}
1124
1125/*
1126 * Request async osd write
1127 */
1128static int rbd_req_write(struct request *rq,
1129                         struct rbd_device *rbd_dev,
1130                         struct ceph_snap_context *snapc,
1131                         u64 ofs, u64 len,
1132                         struct bio *bio,
1133                         struct rbd_req_coll *coll,
1134                         int coll_index)
1135{
1136        return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137                         CEPH_OSD_OP_WRITE,
1138                         CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139                         2,
1140                         ofs, len, bio, coll, coll_index);
1141}
1142
1143/*
1144 * Request async osd read
1145 */
1146static int rbd_req_read(struct request *rq,
1147                         struct rbd_device *rbd_dev,
1148                         u64 snapid,
1149                         u64 ofs, u64 len,
1150                         struct bio *bio,
1151                         struct rbd_req_coll *coll,
1152                         int coll_index)
1153{
1154        return rbd_do_op(rq, rbd_dev, NULL,
1155                         snapid,
1156                         CEPH_OSD_OP_READ,
1157                         CEPH_OSD_FLAG_READ,
1158                         2,
1159                         ofs, len, bio, coll, coll_index);
1160}
1161
1162/*
1163 * Request sync osd read
1164 */
1165static int rbd_req_sync_read(struct rbd_device *dev,
1166                          struct ceph_snap_context *snapc,
1167                          u64 snapid,
1168                          const char *obj,
1169                          u64 ofs, u64 len,
1170                          char *buf,
1171                          u64 *ver)
1172{
1173        return rbd_req_sync_op(dev, NULL,
1174                               snapid,
1175                               CEPH_OSD_OP_READ,
1176                               CEPH_OSD_FLAG_READ,
1177                               NULL,
1178                               1, obj, ofs, len, buf, NULL, ver);
1179}
1180
1181/*
1182 * Request sync osd watch
1183 */
1184static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1185                                   u64 ver,
1186                                   u64 notify_id,
1187                                   const char *obj)
1188{
1189        struct ceph_osd_req_op *ops;
1190        struct page **pages = NULL;
1191        int ret;
1192
1193        ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194        if (ret < 0)
1195                return ret;
1196
1197        ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1198        ops[0].watch.cookie = notify_id;
1199        ops[0].watch.flag = 0;
1200
1201        ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1202                          obj, 0, 0, NULL,
1203                          pages, 0,
1204                          CEPH_OSD_FLAG_READ,
1205                          ops,
1206                          1,
1207                          NULL, 0,
1208                          rbd_simple_req_cb, 0, NULL);
1209
1210        rbd_destroy_ops(ops);
1211        return ret;
1212}
1213
1214static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215{
1216        struct rbd_device *dev = (struct rbd_device *)data;
1217        int rc;
1218
1219        if (!dev)
1220                return;
1221
1222        dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1223                notify_id, (int)opcode);
1224        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225        rc = __rbd_refresh_header(dev);
1226        mutex_unlock(&ctl_mutex);
1227        if (rc)
1228                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229                           " update snaps: %d\n", dev->major, rc);
1230
1231        rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1232}
1233
1234/*
1235 * Request sync osd watch
1236 */
1237static int rbd_req_sync_watch(struct rbd_device *dev,
1238                              const char *obj,
1239                              u64 ver)
1240{
1241        struct ceph_osd_req_op *ops;
1242        struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1243
1244        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245        if (ret < 0)
1246                return ret;
1247
1248        ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249                                     (void *)dev, &dev->watch_event);
1250        if (ret < 0)
1251                goto fail;
1252
1253        ops[0].watch.ver = cpu_to_le64(ver);
1254        ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1255        ops[0].watch.flag = 1;
1256
1257        ret = rbd_req_sync_op(dev, NULL,
1258                              CEPH_NOSNAP,
1259                              0,
1260                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261                              ops,
1262                              1, obj, 0, 0, NULL,
1263                              &dev->watch_request, NULL);
1264
1265        if (ret < 0)
1266                goto fail_event;
1267
1268        rbd_destroy_ops(ops);
1269        return 0;
1270
1271fail_event:
1272        ceph_osdc_cancel_event(dev->watch_event);
1273        dev->watch_event = NULL;
1274fail:
1275        rbd_destroy_ops(ops);
1276        return ret;
1277}
1278
1279/*
1280 * Request sync osd unwatch
1281 */
1282static int rbd_req_sync_unwatch(struct rbd_device *dev,
1283                                const char *obj)
1284{
1285        struct ceph_osd_req_op *ops;
1286
1287        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288        if (ret < 0)
1289                return ret;
1290
1291        ops[0].watch.ver = 0;
1292        ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1293        ops[0].watch.flag = 0;
1294
1295        ret = rbd_req_sync_op(dev, NULL,
1296                              CEPH_NOSNAP,
1297                              0,
1298                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299                              ops,
1300                              1, obj, 0, 0, NULL, NULL, NULL);
1301
1302        rbd_destroy_ops(ops);
1303        ceph_osdc_cancel_event(dev->watch_event);
1304        dev->watch_event = NULL;
1305        return ret;
1306}
1307
1308struct rbd_notify_info {
1309        struct rbd_device *dev;
1310};
1311
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{
1314        struct rbd_device *dev = (struct rbd_device *)data;
1315        if (!dev)
1316                return;
1317
1318        dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1319                notify_id, (int)opcode);
1320}
1321
1322/*
1323 * Request sync osd notify
1324 */
1325static int rbd_req_sync_notify(struct rbd_device *dev,
1326                          const char *obj)
1327{
1328        struct ceph_osd_req_op *ops;
1329        struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1330        struct ceph_osd_event *event;
1331        struct rbd_notify_info info;
1332        int payload_len = sizeof(u32) + sizeof(u32);
1333        int ret;
1334
1335        ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336        if (ret < 0)
1337                return ret;
1338
1339        info.dev = dev;
1340
1341        ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342                                     (void *)&info, &event);
1343        if (ret < 0)
1344                goto fail;
1345
1346        ops[0].watch.ver = 1;
1347        ops[0].watch.flag = 1;
1348        ops[0].watch.cookie = event->cookie;
1349        ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350        ops[0].watch.timeout = 12;
1351
1352        ret = rbd_req_sync_op(dev, NULL,
1353                               CEPH_NOSNAP,
1354                               0,
1355                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356                               ops,
1357                               1, obj, 0, 0, NULL, NULL, NULL);
1358        if (ret < 0)
1359                goto fail_event;
1360
1361        ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362        dout("ceph_osdc_wait_event returned %d\n", ret);
1363        rbd_destroy_ops(ops);
1364        return 0;
1365
1366fail_event:
1367        ceph_osdc_cancel_event(event);
1368fail:
1369        rbd_destroy_ops(ops);
1370        return ret;
1371}
1372
1373/*
1374 * Request sync osd read
1375 */
1376static int rbd_req_sync_exec(struct rbd_device *dev,
1377                             const char *obj,
1378                             const char *cls,
1379                             const char *method,
1380                             const char *data,
1381                             int len,
1382                             u64 *ver)
1383{
1384        struct ceph_osd_req_op *ops;
1385        int cls_len = strlen(cls);
1386        int method_len = strlen(method);
1387        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388                                    cls_len + method_len + len);
1389        if (ret < 0)
1390                return ret;
1391
1392        ops[0].cls.class_name = cls;
1393        ops[0].cls.class_len = (__u8)cls_len;
1394        ops[0].cls.method_name = method;
1395        ops[0].cls.method_len = (__u8)method_len;
1396        ops[0].cls.argc = 0;
1397        ops[0].cls.indata = data;
1398        ops[0].cls.indata_len = len;
1399
1400        ret = rbd_req_sync_op(dev, NULL,
1401                               CEPH_NOSNAP,
1402                               0,
1403                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404                               ops,
1405                               1, obj, 0, 0, NULL, NULL, ver);
1406
1407        rbd_destroy_ops(ops);
1408
1409        dout("cls_exec returned %d\n", ret);
1410        return ret;
1411}
1412
1413static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414{
1415        struct rbd_req_coll *coll =
1416                        kzalloc(sizeof(struct rbd_req_coll) +
1417                                sizeof(struct rbd_req_status) * num_reqs,
1418                                GFP_ATOMIC);
1419
1420        if (!coll)
1421                return NULL;
1422        coll->total = num_reqs;
1423        kref_init(&coll->kref);
1424        return coll;
1425}
1426
1427/*
1428 * block device queue callback
1429 */
1430static void rbd_rq_fn(struct request_queue *q)
1431{
1432        struct rbd_device *rbd_dev = q->queuedata;
1433        struct request *rq;
1434        struct bio_pair *bp = NULL;
1435
1436        while ((rq = blk_fetch_request(q))) {
1437                struct bio *bio;
1438                struct bio *rq_bio, *next_bio = NULL;
1439                bool do_write;
1440                int size, op_size = 0;
1441                u64 ofs;
1442                int num_segs, cur_seg = 0;
1443                struct rbd_req_coll *coll;
1444
1445                /* peek at request from block layer */
1446                if (!rq)
1447                        break;
1448
1449                dout("fetched request\n");
1450
1451                /* filter out block requests we don't understand */
1452                if ((rq->cmd_type != REQ_TYPE_FS)) {
1453                        __blk_end_request_all(rq, 0);
1454                        continue;
1455                }
1456
1457                /* deduce our operation (read, write) */
1458                do_write = (rq_data_dir(rq) == WRITE);
1459
1460                size = blk_rq_bytes(rq);
1461                ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462                rq_bio = rq->bio;
1463                if (do_write && rbd_dev->read_only) {
1464                        __blk_end_request_all(rq, -EROFS);
1465                        continue;
1466                }
1467
1468                spin_unlock_irq(q->queue_lock);
1469
1470                dout("%s 0x%x bytes at 0x%llx\n",
1471                     do_write ? "write" : "read",
1472                     size, blk_rq_pos(rq) * SECTOR_SIZE);
1473
1474                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475                coll = rbd_alloc_coll(num_segs);
1476                if (!coll) {
1477                        spin_lock_irq(q->queue_lock);
1478                        __blk_end_request_all(rq, -ENOMEM);
1479                        continue;
1480                }
1481
1482                do {
1483                        /* a bio clone to be passed down to OSD req */
1484                        dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1485                        op_size = rbd_get_segment(&rbd_dev->header,
1486                                                  rbd_dev->header.block_name,
1487                                                  ofs, size,
1488                                                  NULL, NULL);
1489                        kref_get(&coll->kref);
1490                        bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1491                                              op_size, GFP_ATOMIC);
1492                        if (!bio) {
1493                                rbd_coll_end_req_index(rq, coll, cur_seg,
1494                                                       -ENOMEM, op_size);
1495                                goto next_seg;
1496                        }
1497
1498
1499                        /* init OSD command: write or read */
1500                        if (do_write)
1501                                rbd_req_write(rq, rbd_dev,
1502                                              rbd_dev->header.snapc,
1503                                              ofs,
1504                                              op_size, bio,
1505                                              coll, cur_seg);
1506                        else
1507                                rbd_req_read(rq, rbd_dev,
1508                                             rbd_dev->snap_id,
1509                                             ofs,
1510                                             op_size, bio,
1511                                             coll, cur_seg);
1512
1513next_seg:
1514                        size -= op_size;
1515                        ofs += op_size;
1516
1517                        cur_seg++;
1518                        rq_bio = next_bio;
1519                } while (size > 0);
1520                kref_put(&coll->kref, rbd_coll_release);
1521
1522                if (bp)
1523                        bio_pair_release(bp);
1524                spin_lock_irq(q->queue_lock);
1525        }
1526}
1527
1528/*
1529 * a queue callback. Makes sure that we don't create a bio that spans across
1530 * multiple osd objects. One exception would be with a single page bios,
1531 * which we handle later at bio_chain_clone
1532 */
1533static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1534                          struct bio_vec *bvec)
1535{
1536        struct rbd_device *rbd_dev = q->queuedata;
1537        unsigned int chunk_sectors;
1538        sector_t sector;
1539        unsigned int bio_sectors;
1540        int max;
1541
1542        chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1543        sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1544        bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1545
1546        max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1547                                 + bio_sectors)) << SECTOR_SHIFT;
1548        if (max < 0)
1549                max = 0; /* bio_add cannot handle a negative return */
1550        if (max <= bvec->bv_len && bio_sectors == 0)
1551                return bvec->bv_len;
1552        return max;
1553}
1554
1555static void rbd_free_disk(struct rbd_device *rbd_dev)
1556{
1557        struct gendisk *disk = rbd_dev->disk;
1558
1559        if (!disk)
1560                return;
1561
1562        rbd_header_free(&rbd_dev->header);
1563
1564        if (disk->flags & GENHD_FL_UP)
1565                del_gendisk(disk);
1566        if (disk->queue)
1567                blk_cleanup_queue(disk->queue);
1568        put_disk(disk);
1569}
1570
1571/*
1572 * reload the ondisk the header 
1573 */
1574static int rbd_read_header(struct rbd_device *rbd_dev,
1575                           struct rbd_image_header *header)
1576{
1577        ssize_t rc;
1578        struct rbd_image_header_ondisk *dh;
1579        u32 snap_count = 0;
1580        u64 ver;
1581        size_t len;
1582
1583        /*
1584         * First reads the fixed-size header to determine the number
1585         * of snapshots, then re-reads it, along with all snapshot
1586         * records as well as their stored names.
1587         */
1588        len = sizeof (*dh);
1589        while (1) {
1590                dh = kmalloc(len, GFP_KERNEL);
1591                if (!dh)
1592                        return -ENOMEM;
1593
1594                rc = rbd_req_sync_read(rbd_dev,
1595                                       NULL, CEPH_NOSNAP,
1596                                       rbd_dev->obj_md_name,
1597                                       0, len,
1598                                       (char *)dh, &ver);
1599                if (rc < 0)
1600                        goto out_dh;
1601
1602                rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1603                if (rc < 0) {
1604                        if (rc == -ENXIO)
1605                                pr_warning("unrecognized header format"
1606                                           " for image %s", rbd_dev->obj);
1607                        goto out_dh;
1608                }
1609
1610                if (snap_count == header->total_snaps)
1611                        break;
1612
1613                snap_count = header->total_snaps;
1614                len = sizeof (*dh) +
1615                        snap_count * sizeof(struct rbd_image_snap_ondisk) +
1616                        header->snap_names_len;
1617
1618                rbd_header_free(header);
1619                kfree(dh);
1620        }
1621        header->obj_version = ver;
1622
1623out_dh:
1624        kfree(dh);
1625        return rc;
1626}
1627
1628/*
1629 * create a snapshot
1630 */
1631static int rbd_header_add_snap(struct rbd_device *dev,
1632                               const char *snap_name,
1633                               gfp_t gfp_flags)
1634{
1635        int name_len = strlen(snap_name);
1636        u64 new_snapid;
1637        int ret;
1638        void *data, *p, *e;
1639        u64 ver;
1640        struct ceph_mon_client *monc;
1641
1642        /* we should create a snapshot only if we're pointing at the head */
1643        if (dev->snap_id != CEPH_NOSNAP)
1644                return -EINVAL;
1645
1646        monc = &dev->rbd_client->client->monc;
1647        ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1648        dout("created snapid=%lld\n", new_snapid);
1649        if (ret < 0)
1650                return ret;
1651
1652        data = kmalloc(name_len + 16, gfp_flags);
1653        if (!data)
1654                return -ENOMEM;
1655
1656        p = data;
1657        e = data + name_len + 16;
1658
1659        ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660        ceph_encode_64_safe(&p, e, new_snapid, bad);
1661
1662        ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1663                                data, p - data, &ver);
1664
1665        kfree(data);
1666
1667        if (ret < 0)
1668                return ret;
1669
1670        down_write(&dev->header_rwsem);
1671        dev->header.snapc->seq = new_snapid;
1672        up_write(&dev->header_rwsem);
1673
1674        return 0;
1675bad:
1676        return -ERANGE;
1677}
1678
1679static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680{
1681        struct rbd_snap *snap;
1682
1683        while (!list_empty(&rbd_dev->snaps)) {
1684                snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1685                __rbd_remove_snap_dev(rbd_dev, snap);
1686        }
1687}
1688
1689/*
1690 * only read the first part of the ondisk header, without the snaps info
1691 */
1692static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1693{
1694        int ret;
1695        struct rbd_image_header h;
1696        u64 snap_seq;
1697        int follow_seq = 0;
1698
1699        ret = rbd_read_header(rbd_dev, &h);
1700        if (ret < 0)
1701                return ret;
1702
1703        /* resized? */
1704        set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705
1706        down_write(&rbd_dev->header_rwsem);
1707
1708        snap_seq = rbd_dev->header.snapc->seq;
1709        if (rbd_dev->header.total_snaps &&
1710            rbd_dev->header.snapc->snaps[0] == snap_seq)
1711                /* pointing at the head, will need to follow that
1712                   if head moves */
1713                follow_seq = 1;
1714
1715        kfree(rbd_dev->header.snapc);
1716        kfree(rbd_dev->header.snap_names);
1717        kfree(rbd_dev->header.snap_sizes);
1718
1719        rbd_dev->header.total_snaps = h.total_snaps;
1720        rbd_dev->header.snapc = h.snapc;
1721        rbd_dev->header.snap_names = h.snap_names;
1722        rbd_dev->header.snap_names_len = h.snap_names_len;
1723        rbd_dev->header.snap_sizes = h.snap_sizes;
1724        if (follow_seq)
1725                rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1726        else
1727                rbd_dev->header.snapc->seq = snap_seq;
1728
1729        ret = __rbd_init_snaps_header(rbd_dev);
1730
1731        up_write(&rbd_dev->header_rwsem);
1732
1733        return ret;
1734}
1735
1736static int rbd_init_disk(struct rbd_device *rbd_dev)
1737{
1738        struct gendisk *disk;
1739        struct request_queue *q;
1740        int rc;
1741        u64 segment_size;
1742        u64 total_size = 0;
1743
1744        /* contact OSD, request size info about the object being mapped */
1745        rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746        if (rc)
1747                return rc;
1748
1749        /* no need to lock here, as rbd_dev is not registered yet */
1750        rc = __rbd_init_snaps_header(rbd_dev);
1751        if (rc)
1752                return rc;
1753
1754        rc = rbd_header_set_snap(rbd_dev, &total_size);
1755        if (rc)
1756                return rc;
1757
1758        /* create gendisk info */
1759        rc = -ENOMEM;
1760        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761        if (!disk)
1762                goto out;
1763
1764        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765                 rbd_dev->id);
1766        disk->major = rbd_dev->major;
1767        disk->first_minor = 0;
1768        disk->fops = &rbd_bd_ops;
1769        disk->private_data = rbd_dev;
1770
1771        /* init rq */
1772        rc = -ENOMEM;
1773        q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774        if (!q)
1775                goto out_disk;
1776
1777        /* We use the default size, but let's be explicit about it. */
1778        blk_queue_physical_block_size(q, SECTOR_SIZE);
1779
1780        /* set io sizes to object size */
1781        segment_size = rbd_obj_bytes(&rbd_dev->header);
1782        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1783        blk_queue_max_segment_size(q, segment_size);
1784        blk_queue_io_min(q, segment_size);
1785        blk_queue_io_opt(q, segment_size);
1786
1787        blk_queue_merge_bvec(q, rbd_merge_bvec);
1788        disk->queue = q;
1789
1790        q->queuedata = rbd_dev;
1791
1792        rbd_dev->disk = disk;
1793        rbd_dev->q = q;
1794
1795        /* finally, announce the disk to the world */
1796        set_capacity(disk, total_size / SECTOR_SIZE);
1797        add_disk(disk);
1798
1799        pr_info("%s: added with size 0x%llx\n",
1800                disk->disk_name, (unsigned long long)total_size);
1801        return 0;
1802
1803out_disk:
1804        put_disk(disk);
1805out:
1806        return rc;
1807}
1808
1809/*
1810  sysfs
1811*/
1812
1813static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1814{
1815        return container_of(dev, struct rbd_device, dev);
1816}
1817
1818static ssize_t rbd_size_show(struct device *dev,
1819                             struct device_attribute *attr, char *buf)
1820{
1821        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1822
1823        return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1824}
1825
1826static ssize_t rbd_major_show(struct device *dev,
1827                              struct device_attribute *attr, char *buf)
1828{
1829        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1830
1831        return sprintf(buf, "%d\n", rbd_dev->major);
1832}
1833
1834static ssize_t rbd_client_id_show(struct device *dev,
1835                                  struct device_attribute *attr, char *buf)
1836{
1837        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838
1839        return sprintf(buf, "client%lld\n",
1840                        ceph_client_id(rbd_dev->rbd_client->client));
1841}
1842
1843static ssize_t rbd_pool_show(struct device *dev,
1844                             struct device_attribute *attr, char *buf)
1845{
1846        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847
1848        return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849}
1850
1851static ssize_t rbd_name_show(struct device *dev,
1852                             struct device_attribute *attr, char *buf)
1853{
1854        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855
1856        return sprintf(buf, "%s\n", rbd_dev->obj);
1857}
1858
1859static ssize_t rbd_snap_show(struct device *dev,
1860                             struct device_attribute *attr,
1861                             char *buf)
1862{
1863        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864
1865        return sprintf(buf, "%s\n", rbd_dev->snap_name);
1866}
1867
1868static ssize_t rbd_image_refresh(struct device *dev,
1869                                 struct device_attribute *attr,
1870                                 const char *buf,
1871                                 size_t size)
1872{
1873        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874        int rc;
1875        int ret = size;
1876
1877        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878
1879        rc = __rbd_refresh_header(rbd_dev);
1880        if (rc < 0)
1881                ret = rc;
1882
1883        mutex_unlock(&ctl_mutex);
1884        return ret;
1885}
1886
1887static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1891static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1894static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1895
1896static struct attribute *rbd_attrs[] = {
1897        &dev_attr_size.attr,
1898        &dev_attr_major.attr,
1899        &dev_attr_client_id.attr,
1900        &dev_attr_pool.attr,
1901        &dev_attr_name.attr,
1902        &dev_attr_current_snap.attr,
1903        &dev_attr_refresh.attr,
1904        &dev_attr_create_snap.attr,
1905        NULL
1906};
1907
1908static struct attribute_group rbd_attr_group = {
1909        .attrs = rbd_attrs,
1910};
1911
1912static const struct attribute_group *rbd_attr_groups[] = {
1913        &rbd_attr_group,
1914        NULL
1915};
1916
1917static void rbd_sysfs_dev_release(struct device *dev)
1918{
1919}
1920
1921static struct device_type rbd_device_type = {
1922        .name           = "rbd",
1923        .groups         = rbd_attr_groups,
1924        .release        = rbd_sysfs_dev_release,
1925};
1926
1927
1928/*
1929  sysfs - snapshots
1930*/
1931
1932static ssize_t rbd_snap_size_show(struct device *dev,
1933                                  struct device_attribute *attr,
1934                                  char *buf)
1935{
1936        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1937
1938        return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1939}
1940
1941static ssize_t rbd_snap_id_show(struct device *dev,
1942                                struct device_attribute *attr,
1943                                char *buf)
1944{
1945        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1946
1947        return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1948}
1949
1950static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1951static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1952
1953static struct attribute *rbd_snap_attrs[] = {
1954        &dev_attr_snap_size.attr,
1955        &dev_attr_snap_id.attr,
1956        NULL,
1957};
1958
1959static struct attribute_group rbd_snap_attr_group = {
1960        .attrs = rbd_snap_attrs,
1961};
1962
1963static void rbd_snap_dev_release(struct device *dev)
1964{
1965        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966        kfree(snap->name);
1967        kfree(snap);
1968}
1969
1970static const struct attribute_group *rbd_snap_attr_groups[] = {
1971        &rbd_snap_attr_group,
1972        NULL
1973};
1974
1975static struct device_type rbd_snap_device_type = {
1976        .groups         = rbd_snap_attr_groups,
1977        .release        = rbd_snap_dev_release,
1978};
1979
1980static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1981                                  struct rbd_snap *snap)
1982{
1983        list_del(&snap->node);
1984        device_unregister(&snap->dev);
1985}
1986
1987static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1988                                  struct rbd_snap *snap,
1989                                  struct device *parent)
1990{
1991        struct device *dev = &snap->dev;
1992        int ret;
1993
1994        dev->type = &rbd_snap_device_type;
1995        dev->parent = parent;
1996        dev->release = rbd_snap_dev_release;
1997        dev_set_name(dev, "snap_%s", snap->name);
1998        ret = device_register(dev);
1999
2000        return ret;
2001}
2002
2003static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004                              int i, const char *name,
2005                              struct rbd_snap **snapp)
2006{
2007        int ret;
2008        struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2009        if (!snap)
2010                return -ENOMEM;
2011        snap->name = kstrdup(name, GFP_KERNEL);
2012        snap->size = rbd_dev->header.snap_sizes[i];
2013        snap->id = rbd_dev->header.snapc->snaps[i];
2014        if (device_is_registered(&rbd_dev->dev)) {
2015                ret = rbd_register_snap_dev(rbd_dev, snap,
2016                                             &rbd_dev->dev);
2017                if (ret < 0)
2018                        goto err;
2019        }
2020        *snapp = snap;
2021        return 0;
2022err:
2023        kfree(snap->name);
2024        kfree(snap);
2025        return ret;
2026}
2027
2028/*
2029 * search for the previous snap in a null delimited string list
2030 */
2031const char *rbd_prev_snap_name(const char *name, const char *start)
2032{
2033        if (name < start + 2)
2034                return NULL;
2035
2036        name -= 2;
2037        while (*name) {
2038                if (name == start)
2039                        return start;
2040                name--;
2041        }
2042        return name + 1;
2043}
2044
2045/*
2046 * compare the old list of snapshots that we have to what's in the header
2047 * and update it accordingly. Note that the header holds the snapshots
2048 * in a reverse order (from newest to oldest) and we need to go from
2049 * older to new so that we don't get a duplicate snap name when
2050 * doing the process (e.g., removed snapshot and recreated a new
2051 * one with the same name.
2052 */
2053static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2054{
2055        const char *name, *first_name;
2056        int i = rbd_dev->header.total_snaps;
2057        struct rbd_snap *snap, *old_snap = NULL;
2058        int ret;
2059        struct list_head *p, *n;
2060
2061        first_name = rbd_dev->header.snap_names;
2062        name = first_name + rbd_dev->header.snap_names_len;
2063
2064        list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2065                u64 cur_id;
2066
2067                old_snap = list_entry(p, struct rbd_snap, node);
2068
2069                if (i)
2070                        cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071
2072                if (!i || old_snap->id < cur_id) {
2073                        /* old_snap->id was skipped, thus was removed */
2074                        __rbd_remove_snap_dev(rbd_dev, old_snap);
2075                        continue;
2076                }
2077                if (old_snap->id == cur_id) {
2078                        /* we have this snapshot already */
2079                        i--;
2080                        name = rbd_prev_snap_name(name, first_name);
2081                        continue;
2082                }
2083                for (; i > 0;
2084                     i--, name = rbd_prev_snap_name(name, first_name)) {
2085                        if (!name) {
2086                                WARN_ON(1);
2087                                return -EINVAL;
2088                        }
2089                        cur_id = rbd_dev->header.snapc->snaps[i];
2090                        /* snapshot removal? handle it above */
2091                        if (cur_id >= old_snap->id)
2092                                break;
2093                        /* a new snapshot */
2094                        ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095                        if (ret < 0)
2096                                return ret;
2097
2098                        /* note that we add it backward so using n and not p */
2099                        list_add(&snap->node, n);
2100                        p = &snap->node;
2101                }
2102        }
2103        /* we're done going over the old snap list, just add what's left */
2104        for (; i > 0; i--) {
2105                name = rbd_prev_snap_name(name, first_name);
2106                if (!name) {
2107                        WARN_ON(1);
2108                        return -EINVAL;
2109                }
2110                ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2111                if (ret < 0)
2112                        return ret;
2113                list_add(&snap->node, &rbd_dev->snaps);
2114        }
2115
2116        return 0;
2117}
2118
2119static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120{
2121        int ret;
2122        struct device *dev;
2123        struct rbd_snap *snap;
2124
2125        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2126        dev = &rbd_dev->dev;
2127
2128        dev->bus = &rbd_bus_type;
2129        dev->type = &rbd_device_type;
2130        dev->parent = &rbd_root_dev;
2131        dev->release = rbd_dev_release;
2132        dev_set_name(dev, "%d", rbd_dev->id);
2133        ret = device_register(dev);
2134        if (ret < 0)
2135                goto out;
2136
2137        list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138                ret = rbd_register_snap_dev(rbd_dev, snap,
2139                                             &rbd_dev->dev);
2140                if (ret < 0)
2141                        break;
2142        }
2143out:
2144        mutex_unlock(&ctl_mutex);
2145        return ret;
2146}
2147
2148static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2149{
2150        device_unregister(&rbd_dev->dev);
2151}
2152
2153static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154{
2155        int ret, rc;
2156
2157        do {
2158                ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2159                                         rbd_dev->header.obj_version);
2160                if (ret == -ERANGE) {
2161                        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162                        rc = __rbd_refresh_header(rbd_dev);
2163                        mutex_unlock(&ctl_mutex);
2164                        if (rc < 0)
2165                                return rc;
2166                }
2167        } while (ret == -ERANGE);
2168
2169        return ret;
2170}
2171
2172static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2173
2174/*
2175 * Get a unique rbd identifier for the given new rbd_dev, and add
2176 * the rbd_dev to the global list.  The minimum rbd id is 1.
2177 */
2178static void rbd_id_get(struct rbd_device *rbd_dev)
2179{
2180        rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2181
2182        spin_lock(&rbd_dev_list_lock);
2183        list_add_tail(&rbd_dev->node, &rbd_dev_list);
2184        spin_unlock(&rbd_dev_list_lock);
2185}
2186
2187/*
2188 * Remove an rbd_dev from the global list, and record that its
2189 * identifier is no longer in use.
2190 */
2191static void rbd_id_put(struct rbd_device *rbd_dev)
2192{
2193        struct list_head *tmp;
2194        int rbd_id = rbd_dev->id;
2195        int max_id;
2196
2197        BUG_ON(rbd_id < 1);
2198
2199        spin_lock(&rbd_dev_list_lock);
2200        list_del_init(&rbd_dev->node);
2201
2202        /*
2203         * If the id being "put" is not the current maximum, there
2204         * is nothing special we need to do.
2205         */
2206        if (rbd_id != atomic64_read(&rbd_id_max)) {
2207                spin_unlock(&rbd_dev_list_lock);
2208                return;
2209        }
2210
2211        /*
2212         * We need to update the current maximum id.  Search the
2213         * list to find out what it is.  We're more likely to find
2214         * the maximum at the end, so search the list backward.
2215         */
2216        max_id = 0;
2217        list_for_each_prev(tmp, &rbd_dev_list) {
2218                struct rbd_device *rbd_dev;
2219
2220                rbd_dev = list_entry(tmp, struct rbd_device, node);
2221                if (rbd_id > max_id)
2222                        max_id = rbd_id;
2223        }
2224        spin_unlock(&rbd_dev_list_lock);
2225
2226        /*
2227         * The max id could have been updated by rbd_id_get(), in
2228         * which case it now accurately reflects the new maximum.
2229         * Be careful not to overwrite the maximum value in that
2230         * case.
2231         */
2232        atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233}
2234
2235/*
2236 * Skips over white space at *buf, and updates *buf to point to the
2237 * first found non-space character (if any). Returns the length of
2238 * the token (string of non-white space characters) found.  Note
2239 * that *buf must be terminated with '\0'.
2240 */
2241static inline size_t next_token(const char **buf)
2242{
2243        /*
2244        * These are the characters that produce nonzero for
2245        * isspace() in the "C" and "POSIX" locales.
2246        */
2247        const char *spaces = " \f\n\r\t\v";
2248
2249        *buf += strspn(*buf, spaces);   /* Find start of token */
2250
2251        return strcspn(*buf, spaces);   /* Return token length */
2252}
2253
2254/*
2255 * Finds the next token in *buf, and if the provided token buffer is
2256 * big enough, copies the found token into it.  The result, if
2257 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2258 * must be terminated with '\0' on entry.
2259 *
2260 * Returns the length of the token found (not including the '\0').
2261 * Return value will be 0 if no token is found, and it will be >=
2262 * token_size if the token would not fit.
2263 *
2264 * The *buf pointer will be updated to point beyond the end of the
2265 * found token.  Note that this occurs even if the token buffer is
2266 * too small to hold it.
2267 */
2268static inline size_t copy_token(const char **buf,
2269                                char *token,
2270                                size_t token_size)
2271{
2272        size_t len;
2273
2274        len = next_token(buf);
2275        if (len < token_size) {
2276                memcpy(token, *buf, len);
2277                *(token + len) = '\0';
2278        }
2279        *buf += len;
2280
2281        return len;
2282}
2283
2284/*
2285 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2286 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2287 * on the list of monitor addresses and other options provided via
2288 * /sys/bus/rbd/add.
2289 */
2290static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291                              const char *buf,
2292                              const char **mon_addrs,
2293                              size_t *mon_addrs_size,
2294                              char *options,
2295                              size_t options_size)
2296{
2297        size_t  len;
2298
2299        /* The first four tokens are required */
2300
2301        len = next_token(&buf);
2302        if (!len)
2303                return -EINVAL;
2304        *mon_addrs_size = len + 1;
2305        *mon_addrs = buf;
2306
2307        buf += len;
2308
2309        len = copy_token(&buf, options, options_size);
2310        if (!len || len >= options_size)
2311                return -EINVAL;
2312
2313        len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2314        if (!len || len >= sizeof (rbd_dev->pool_name))
2315                return -EINVAL;
2316
2317        len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318        if (!len || len >= sizeof (rbd_dev->obj))
2319                return -EINVAL;
2320
2321        /* We have the object length in hand, save it. */
2322
2323        rbd_dev->obj_len = len;
2324
2325        BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2326                                < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2327        sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2328
2329        /*
2330         * The snapshot name is optional, but it's an error if it's
2331         * too long.  If no snapshot is supplied, fill in the default.
2332         */
2333        len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2334        if (!len)
2335                memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336                        sizeof (RBD_SNAP_HEAD_NAME));
2337        else if (len >= sizeof (rbd_dev->snap_name))
2338                return -EINVAL;
2339
2340        return 0;
2341}
2342
2343static ssize_t rbd_add(struct bus_type *bus,
2344                       const char *buf,
2345                       size_t count)
2346{
2347        struct rbd_device *rbd_dev;
2348        const char *mon_addrs = NULL;
2349        size_t mon_addrs_size = 0;
2350        char *options = NULL;
2351        struct ceph_osd_client *osdc;
2352        int rc = -ENOMEM;
2353
2354        if (!try_module_get(THIS_MODULE))
2355                return -ENODEV;
2356
2357        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358        if (!rbd_dev)
2359                goto err_nomem;
2360        options = kmalloc(count, GFP_KERNEL);
2361        if (!options)
2362                goto err_nomem;
2363
2364        /* static rbd_device initialization */
2365        spin_lock_init(&rbd_dev->lock);
2366        INIT_LIST_HEAD(&rbd_dev->node);
2367        INIT_LIST_HEAD(&rbd_dev->snaps);
2368        init_rwsem(&rbd_dev->header_rwsem);
2369
2370        init_rwsem(&rbd_dev->header_rwsem);
2371
2372        /* generate unique id: find highest unique id, add one */
2373        rbd_id_get(rbd_dev);
2374
2375        /* Fill in the device name, now that we have its id. */
2376        BUILD_BUG_ON(DEV_NAME_LEN
2377                        < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378        sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2379
2380        /* parse add command */
2381        rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2382                                options, count);
2383        if (rc)
2384                goto err_put_id;
2385
2386        rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2387                                                options);
2388        if (IS_ERR(rbd_dev->rbd_client)) {
2389                rc = PTR_ERR(rbd_dev->rbd_client);
2390                goto err_put_id;
2391        }
2392
2393        /* pick the pool */
2394        osdc = &rbd_dev->rbd_client->client->osdc;
2395        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396        if (rc < 0)
2397                goto err_out_client;
2398        rbd_dev->poolid = rc;
2399
2400        /* register our block device */
2401        rc = register_blkdev(0, rbd_dev->name);
2402        if (rc < 0)
2403                goto err_out_client;
2404        rbd_dev->major = rc;
2405
2406        rc = rbd_bus_add_dev(rbd_dev);
2407        if (rc)
2408                goto err_out_blkdev;
2409
2410        /*
2411         * At this point cleanup in the event of an error is the job
2412         * of the sysfs code (initiated by rbd_bus_del_dev()).
2413         *
2414         * Set up and announce blkdev mapping.
2415         */
2416        rc = rbd_init_disk(rbd_dev);
2417        if (rc)
2418                goto err_out_bus;
2419
2420        rc = rbd_init_watch_dev(rbd_dev);
2421        if (rc)
2422                goto err_out_bus;
2423
2424        return count;
2425
2426err_out_bus:
2427        /* this will also clean up rest of rbd_dev stuff */
2428
2429        rbd_bus_del_dev(rbd_dev);
2430        kfree(options);
2431        return rc;
2432
2433err_out_blkdev:
2434        unregister_blkdev(rbd_dev->major, rbd_dev->name);
2435err_out_client:
2436        rbd_put_client(rbd_dev);
2437err_put_id:
2438        rbd_id_put(rbd_dev);
2439err_nomem:
2440        kfree(options);
2441        kfree(rbd_dev);
2442
2443        dout("Error adding device %s\n", buf);
2444        module_put(THIS_MODULE);
2445
2446        return (ssize_t) rc;
2447}
2448
2449static struct rbd_device *__rbd_get_dev(unsigned long id)
2450{
2451        struct list_head *tmp;
2452        struct rbd_device *rbd_dev;
2453
2454        spin_lock(&rbd_dev_list_lock);
2455        list_for_each(tmp, &rbd_dev_list) {
2456                rbd_dev = list_entry(tmp, struct rbd_device, node);
2457                if (rbd_dev->id == id) {
2458                        spin_unlock(&rbd_dev_list_lock);
2459                        return rbd_dev;
2460                }
2461        }
2462        spin_unlock(&rbd_dev_list_lock);
2463        return NULL;
2464}
2465
2466static void rbd_dev_release(struct device *dev)
2467{
2468        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470        if (rbd_dev->watch_request) {
2471                struct ceph_client *client = rbd_dev->rbd_client->client;
2472
2473                ceph_osdc_unregister_linger_request(&client->osdc,
2474                                                    rbd_dev->watch_request);
2475        }
2476        if (rbd_dev->watch_event)
2477                rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2478
2479        rbd_put_client(rbd_dev);
2480
2481        /* clean up and free blkdev */
2482        rbd_free_disk(rbd_dev);
2483        unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484
2485        /* done with the id, and with the rbd_dev */
2486        rbd_id_put(rbd_dev);
2487        kfree(rbd_dev);
2488
2489        /* release module ref */
2490        module_put(THIS_MODULE);
2491}
2492
2493static ssize_t rbd_remove(struct bus_type *bus,
2494                          const char *buf,
2495                          size_t count)
2496{
2497        struct rbd_device *rbd_dev = NULL;
2498        int target_id, rc;
2499        unsigned long ul;
2500        int ret = count;
2501
2502        rc = strict_strtoul(buf, 10, &ul);
2503        if (rc)
2504                return rc;
2505
2506        /* convert to int; abort if we lost anything in the conversion */
2507        target_id = (int) ul;
2508        if (target_id != ul)
2509                return -EINVAL;
2510
2511        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2512
2513        rbd_dev = __rbd_get_dev(target_id);
2514        if (!rbd_dev) {
2515                ret = -ENOENT;
2516                goto done;
2517        }
2518
2519        __rbd_remove_all_snaps(rbd_dev);
2520        rbd_bus_del_dev(rbd_dev);
2521
2522done:
2523        mutex_unlock(&ctl_mutex);
2524        return ret;
2525}
2526
2527static ssize_t rbd_snap_add(struct device *dev,
2528                            struct device_attribute *attr,
2529                            const char *buf,
2530                            size_t count)
2531{
2532        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2533        int ret;
2534        char *name = kmalloc(count + 1, GFP_KERNEL);
2535        if (!name)
2536                return -ENOMEM;
2537
2538        snprintf(name, count, "%s", buf);
2539
2540        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541
2542        ret = rbd_header_add_snap(rbd_dev,
2543                                  name, GFP_KERNEL);
2544        if (ret < 0)
2545                goto err_unlock;
2546
2547        ret = __rbd_refresh_header(rbd_dev);
2548        if (ret < 0)
2549                goto err_unlock;
2550
2551        /* shouldn't hold ctl_mutex when notifying.. notify might
2552           trigger a watch callback that would need to get that mutex */
2553        mutex_unlock(&ctl_mutex);
2554
2555        /* make a best effort, don't error if failed */
2556        rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2557
2558        ret = count;
2559        kfree(name);
2560        return ret;
2561
2562err_unlock:
2563        mutex_unlock(&ctl_mutex);
2564        kfree(name);
2565        return ret;
2566}
2567
2568/*
2569 * create control files in sysfs
2570 * /sys/bus/rbd/...
2571 */
2572static int rbd_sysfs_init(void)
2573{
2574        int ret;
2575
2576        ret = device_register(&rbd_root_dev);
2577        if (ret < 0)
2578                return ret;
2579
2580        ret = bus_register(&rbd_bus_type);
2581        if (ret < 0)
2582                device_unregister(&rbd_root_dev);
2583
2584        return ret;
2585}
2586
2587static void rbd_sysfs_cleanup(void)
2588{
2589        bus_unregister(&rbd_bus_type);
2590        device_unregister(&rbd_root_dev);
2591}
2592
2593int __init rbd_init(void)
2594{
2595        int rc;
2596
2597        rc = rbd_sysfs_init();
2598        if (rc)
2599                return rc;
2600        pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2601        return 0;
2602}
2603
2604void __exit rbd_exit(void)
2605{
2606        rbd_sysfs_cleanup();
2607}
2608
2609module_init(rbd_init);
2610module_exit(rbd_exit);
2611
2612MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2613MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2614MODULE_DESCRIPTION("rados block device");
2615
2616/* following authorship retained from original osdblk.c */
2617MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2618
2619MODULE_LICENSE("GPL");
2620