linux/drivers/block/rbd.c
<<
>>
Prefs
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34#include <linux/parser.h>
  35
  36#include <linux/kernel.h>
  37#include <linux/device.h>
  38#include <linux/module.h>
  39#include <linux/fs.h>
  40#include <linux/blkdev.h>
  41
  42#include "rbd_types.h"
  43
  44/*
  45 * The basic unit of block I/O is a sector.  It is interpreted in a
  46 * number of contexts in Linux (blk, bio, genhd), but the default is
  47 * universally 512 bytes.  These symbols are just slightly more
  48 * meaningful than the bare numbers they represent.
  49 */
  50#define SECTOR_SHIFT    9
  51#define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
  52
  53#define RBD_DRV_NAME "rbd"
  54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
  55
  56#define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
  57
  58#define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
  59#define RBD_MAX_POOL_NAME_LEN   64
  60#define RBD_MAX_SNAP_NAME_LEN   32
  61#define RBD_MAX_OPT_LEN         1024
  62
  63#define RBD_SNAP_HEAD_NAME      "-"
  64
  65/*
  66 * An RBD device name will be "rbd#", where the "rbd" comes from
  67 * RBD_DRV_NAME above, and # is a unique integer identifier.
  68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
  69 * enough to hold all possible device names.
  70 */
  71#define DEV_NAME_LEN            32
  72#define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
  73
  74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
  75
  76/*
  77 * block device image metadata (in-memory version)
  78 */
  79struct rbd_image_header {
  80        u64 image_size;
  81        char block_name[32];
  82        __u8 obj_order;
  83        __u8 crypt_type;
  84        __u8 comp_type;
  85        struct ceph_snap_context *snapc;
  86        size_t snap_names_len;
  87        u64 snap_seq;
  88        u32 total_snaps;
  89
  90        char *snap_names;
  91        u64 *snap_sizes;
  92
  93        u64 obj_version;
  94};
  95
  96struct rbd_options {
  97        int     notify_timeout;
  98};
  99
 100/*
 101 * an instance of the client.  multiple devices may share an rbd client.
 102 */
 103struct rbd_client {
 104        struct ceph_client      *client;
 105        struct rbd_options      *rbd_opts;
 106        struct kref             kref;
 107        struct list_head        node;
 108};
 109
 110/*
 111 * a request completion status
 112 */
 113struct rbd_req_status {
 114        int done;
 115        int rc;
 116        u64 bytes;
 117};
 118
 119/*
 120 * a collection of requests
 121 */
 122struct rbd_req_coll {
 123        int                     total;
 124        int                     num_done;
 125        struct kref             kref;
 126        struct rbd_req_status   status[0];
 127};
 128
 129/*
 130 * a single io request
 131 */
 132struct rbd_request {
 133        struct request          *rq;            /* blk layer request */
 134        struct bio              *bio;           /* cloned bio */
 135        struct page             **pages;        /* list of used pages */
 136        u64                     len;
 137        int                     coll_index;
 138        struct rbd_req_coll     *coll;
 139};
 140
 141struct rbd_snap {
 142        struct  device          dev;
 143        const char              *name;
 144        size_t                  size;
 145        struct list_head        node;
 146        u64                     id;
 147};
 148
 149/*
 150 * a single device
 151 */
 152struct rbd_device {
 153        int                     id;             /* blkdev unique id */
 154
 155        int                     major;          /* blkdev assigned major */
 156        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 157        struct request_queue    *q;
 158
 159        struct rbd_client       *rbd_client;
 160
 161        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 162
 163        spinlock_t              lock;           /* queue lock */
 164
 165        struct rbd_image_header header;
 166        char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 167        int                     obj_len;
 168        char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 169        char                    pool_name[RBD_MAX_POOL_NAME_LEN];
 170        int                     poolid;
 171
 172        struct ceph_osd_event   *watch_event;
 173        struct ceph_osd_request *watch_request;
 174
 175        /* protects updating the header */
 176        struct rw_semaphore     header_rwsem;
 177        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 178        u32 cur_snap;   /* index+1 of current snapshot within snap context
 179                           0 - for the head */
 180        int read_only;
 181
 182        struct list_head        node;
 183
 184        /* list of snapshots */
 185        struct list_head        snaps;
 186
 187        /* sysfs related */
 188        struct device           dev;
 189};
 190
 191static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
 192
 193static LIST_HEAD(rbd_dev_list);    /* devices */
 194static DEFINE_SPINLOCK(rbd_dev_list_lock);
 195
 196static LIST_HEAD(rbd_client_list);              /* clients */
 197static DEFINE_SPINLOCK(rbd_client_list_lock);
 198
 199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 200static void rbd_dev_release(struct device *dev);
 201static ssize_t rbd_snap_add(struct device *dev,
 202                            struct device_attribute *attr,
 203                            const char *buf,
 204                            size_t count);
 205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 206                                  struct rbd_snap *snap);
 207
 208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 209                       size_t count);
 210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 211                          size_t count);
 212
 213static struct bus_attribute rbd_bus_attrs[] = {
 214        __ATTR(add, S_IWUSR, NULL, rbd_add),
 215        __ATTR(remove, S_IWUSR, NULL, rbd_remove),
 216        __ATTR_NULL
 217};
 218
 219static struct bus_type rbd_bus_type = {
 220        .name           = "rbd",
 221        .bus_attrs      = rbd_bus_attrs,
 222};
 223
 224static void rbd_root_dev_release(struct device *dev)
 225{
 226}
 227
 228static struct device rbd_root_dev = {
 229        .init_name =    "rbd",
 230        .release =      rbd_root_dev_release,
 231};
 232
 233
 234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 235{
 236        return get_device(&rbd_dev->dev);
 237}
 238
 239static void rbd_put_dev(struct rbd_device *rbd_dev)
 240{
 241        put_device(&rbd_dev->dev);
 242}
 243
 244static int __rbd_update_snaps(struct rbd_device *rbd_dev);
 245
 246static int rbd_open(struct block_device *bdev, fmode_t mode)
 247{
 248        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 249
 250        rbd_get_dev(rbd_dev);
 251
 252        set_device_ro(bdev, rbd_dev->read_only);
 253
 254        if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 255                return -EROFS;
 256
 257        return 0;
 258}
 259
 260static int rbd_release(struct gendisk *disk, fmode_t mode)
 261{
 262        struct rbd_device *rbd_dev = disk->private_data;
 263
 264        rbd_put_dev(rbd_dev);
 265
 266        return 0;
 267}
 268
 269static const struct block_device_operations rbd_bd_ops = {
 270        .owner                  = THIS_MODULE,
 271        .open                   = rbd_open,
 272        .release                = rbd_release,
 273};
 274
 275/*
 276 * Initialize an rbd client instance.
 277 * We own *opt.
 278 */
 279static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 280                                            struct rbd_options *rbd_opts)
 281{
 282        struct rbd_client *rbdc;
 283        int ret = -ENOMEM;
 284
 285        dout("rbd_client_create\n");
 286        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 287        if (!rbdc)
 288                goto out_opt;
 289
 290        kref_init(&rbdc->kref);
 291        INIT_LIST_HEAD(&rbdc->node);
 292
 293        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 294
 295        rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
 296        if (IS_ERR(rbdc->client))
 297                goto out_mutex;
 298        opt = NULL; /* Now rbdc->client is responsible for opt */
 299
 300        ret = ceph_open_session(rbdc->client);
 301        if (ret < 0)
 302                goto out_err;
 303
 304        rbdc->rbd_opts = rbd_opts;
 305
 306        spin_lock(&rbd_client_list_lock);
 307        list_add_tail(&rbdc->node, &rbd_client_list);
 308        spin_unlock(&rbd_client_list_lock);
 309
 310        mutex_unlock(&ctl_mutex);
 311
 312        dout("rbd_client_create created %p\n", rbdc);
 313        return rbdc;
 314
 315out_err:
 316        ceph_destroy_client(rbdc->client);
 317out_mutex:
 318        mutex_unlock(&ctl_mutex);
 319        kfree(rbdc);
 320out_opt:
 321        if (opt)
 322                ceph_destroy_options(opt);
 323        return ERR_PTR(ret);
 324}
 325
 326/*
 327 * Find a ceph client with specific addr and configuration.
 328 */
 329static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 330{
 331        struct rbd_client *client_node;
 332
 333        if (opt->flags & CEPH_OPT_NOSHARE)
 334                return NULL;
 335
 336        list_for_each_entry(client_node, &rbd_client_list, node)
 337                if (ceph_compare_options(opt, client_node->client) == 0)
 338                        return client_node;
 339        return NULL;
 340}
 341
 342/*
 343 * mount options
 344 */
 345enum {
 346        Opt_notify_timeout,
 347        Opt_last_int,
 348        /* int args above */
 349        Opt_last_string,
 350        /* string args above */
 351};
 352
 353static match_table_t rbdopt_tokens = {
 354        {Opt_notify_timeout, "notify_timeout=%d"},
 355        /* int args above */
 356        /* string args above */
 357        {-1, NULL}
 358};
 359
 360static int parse_rbd_opts_token(char *c, void *private)
 361{
 362        struct rbd_options *rbdopt = private;
 363        substring_t argstr[MAX_OPT_ARGS];
 364        int token, intval, ret;
 365
 366        token = match_token(c, rbdopt_tokens, argstr);
 367        if (token < 0)
 368                return -EINVAL;
 369
 370        if (token < Opt_last_int) {
 371                ret = match_int(&argstr[0], &intval);
 372                if (ret < 0) {
 373                        pr_err("bad mount option arg (not int) "
 374                               "at '%s'\n", c);
 375                        return ret;
 376                }
 377                dout("got int token %d val %d\n", token, intval);
 378        } else if (token > Opt_last_int && token < Opt_last_string) {
 379                dout("got string token %d val %s\n", token,
 380                     argstr[0].from);
 381        } else {
 382                dout("got token %d\n", token);
 383        }
 384
 385        switch (token) {
 386        case Opt_notify_timeout:
 387                rbdopt->notify_timeout = intval;
 388                break;
 389        default:
 390                BUG_ON(token);
 391        }
 392        return 0;
 393}
 394
 395/*
 396 * Get a ceph client with specific addr and configuration, if one does
 397 * not exist create it.
 398 */
 399static struct rbd_client *rbd_get_client(const char *mon_addr,
 400                                         size_t mon_addr_len,
 401                                         char *options)
 402{
 403        struct rbd_client *rbdc;
 404        struct ceph_options *opt;
 405        struct rbd_options *rbd_opts;
 406
 407        rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
 408        if (!rbd_opts)
 409                return ERR_PTR(-ENOMEM);
 410
 411        rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 412
 413        opt = ceph_parse_options(options, mon_addr,
 414                                mon_addr + mon_addr_len,
 415                                parse_rbd_opts_token, rbd_opts);
 416        if (IS_ERR(opt)) {
 417                kfree(rbd_opts);
 418                return ERR_CAST(opt);
 419        }
 420
 421        spin_lock(&rbd_client_list_lock);
 422        rbdc = __rbd_client_find(opt);
 423        if (rbdc) {
 424                /* using an existing client */
 425                kref_get(&rbdc->kref);
 426                spin_unlock(&rbd_client_list_lock);
 427
 428                ceph_destroy_options(opt);
 429                kfree(rbd_opts);
 430
 431                return rbdc;
 432        }
 433        spin_unlock(&rbd_client_list_lock);
 434
 435        rbdc = rbd_client_create(opt, rbd_opts);
 436
 437        if (IS_ERR(rbdc))
 438                kfree(rbd_opts);
 439
 440        return rbdc;
 441}
 442
 443/*
 444 * Destroy ceph client
 445 *
 446 * Caller must hold rbd_client_list_lock.
 447 */
 448static void rbd_client_release(struct kref *kref)
 449{
 450        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 451
 452        dout("rbd_release_client %p\n", rbdc);
 453        list_del(&rbdc->node);
 454
 455        ceph_destroy_client(rbdc->client);
 456        kfree(rbdc->rbd_opts);
 457        kfree(rbdc);
 458}
 459
 460/*
 461 * Drop reference to ceph client node. If it's not referenced anymore, release
 462 * it.
 463 */
 464static void rbd_put_client(struct rbd_device *rbd_dev)
 465{
 466        spin_lock(&rbd_client_list_lock);
 467        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 468        spin_unlock(&rbd_client_list_lock);
 469        rbd_dev->rbd_client = NULL;
 470}
 471
 472/*
 473 * Destroy requests collection
 474 */
 475static void rbd_coll_release(struct kref *kref)
 476{
 477        struct rbd_req_coll *coll =
 478                container_of(kref, struct rbd_req_coll, kref);
 479
 480        dout("rbd_coll_release %p\n", coll);
 481        kfree(coll);
 482}
 483
 484/*
 485 * Create a new header structure, translate header format from the on-disk
 486 * header.
 487 */
 488static int rbd_header_from_disk(struct rbd_image_header *header,
 489                                 struct rbd_image_header_ondisk *ondisk,
 490                                 int allocated_snaps,
 491                                 gfp_t gfp_flags)
 492{
 493        int i;
 494        u32 snap_count;
 495
 496        if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
 497                return -ENXIO;
 498
 499        snap_count = le32_to_cpu(ondisk->snap_count);
 500        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 501                                snap_count * sizeof (*ondisk),
 502                                gfp_flags);
 503        if (!header->snapc)
 504                return -ENOMEM;
 505
 506        header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 507        if (snap_count) {
 508                header->snap_names = kmalloc(header->snap_names_len,
 509                                             GFP_KERNEL);
 510                if (!header->snap_names)
 511                        goto err_snapc;
 512                header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 513                                             GFP_KERNEL);
 514                if (!header->snap_sizes)
 515                        goto err_names;
 516        } else {
 517                header->snap_names = NULL;
 518                header->snap_sizes = NULL;
 519        }
 520        memcpy(header->block_name, ondisk->block_name,
 521               sizeof(ondisk->block_name));
 522
 523        header->image_size = le64_to_cpu(ondisk->image_size);
 524        header->obj_order = ondisk->options.order;
 525        header->crypt_type = ondisk->options.crypt_type;
 526        header->comp_type = ondisk->options.comp_type;
 527
 528        atomic_set(&header->snapc->nref, 1);
 529        header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 530        header->snapc->num_snaps = snap_count;
 531        header->total_snaps = snap_count;
 532
 533        if (snap_count && allocated_snaps == snap_count) {
 534                for (i = 0; i < snap_count; i++) {
 535                        header->snapc->snaps[i] =
 536                                le64_to_cpu(ondisk->snaps[i].id);
 537                        header->snap_sizes[i] =
 538                                le64_to_cpu(ondisk->snaps[i].image_size);
 539                }
 540
 541                /* copy snapshot names */
 542                memcpy(header->snap_names, &ondisk->snaps[i],
 543                        header->snap_names_len);
 544        }
 545
 546        return 0;
 547
 548err_names:
 549        kfree(header->snap_names);
 550err_snapc:
 551        kfree(header->snapc);
 552        return -ENOMEM;
 553}
 554
 555static int snap_index(struct rbd_image_header *header, int snap_num)
 556{
 557        return header->total_snaps - snap_num;
 558}
 559
 560static u64 cur_snap_id(struct rbd_device *rbd_dev)
 561{
 562        struct rbd_image_header *header = &rbd_dev->header;
 563
 564        if (!rbd_dev->cur_snap)
 565                return 0;
 566
 567        return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
 568}
 569
 570static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 571                        u64 *seq, u64 *size)
 572{
 573        int i;
 574        char *p = header->snap_names;
 575
 576        for (i = 0; i < header->total_snaps; i++) {
 577                if (!strcmp(snap_name, p)) {
 578
 579                        /* Found it.  Pass back its id and/or size */
 580
 581                        if (seq)
 582                                *seq = header->snapc->snaps[i];
 583                        if (size)
 584                                *size = header->snap_sizes[i];
 585                        return i;
 586                }
 587                p += strlen(p) + 1;     /* Skip ahead to the next name */
 588        }
 589        return -ENOENT;
 590}
 591
 592static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
 593{
 594        struct rbd_image_header *header = &dev->header;
 595        struct ceph_snap_context *snapc = header->snapc;
 596        int ret = -ENOENT;
 597
 598        BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
 599
 600        down_write(&dev->header_rwsem);
 601
 602        if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
 603                    sizeof (RBD_SNAP_HEAD_NAME))) {
 604                if (header->total_snaps)
 605                        snapc->seq = header->snap_seq;
 606                else
 607                        snapc->seq = 0;
 608                dev->cur_snap = 0;
 609                dev->read_only = 0;
 610                if (size)
 611                        *size = header->image_size;
 612        } else {
 613                ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
 614                if (ret < 0)
 615                        goto done;
 616
 617                dev->cur_snap = header->total_snaps - ret;
 618                dev->read_only = 1;
 619        }
 620
 621        ret = 0;
 622done:
 623        up_write(&dev->header_rwsem);
 624        return ret;
 625}
 626
 627static void rbd_header_free(struct rbd_image_header *header)
 628{
 629        kfree(header->snapc);
 630        kfree(header->snap_names);
 631        kfree(header->snap_sizes);
 632}
 633
 634/*
 635 * get the actual striped segment name, offset and length
 636 */
 637static u64 rbd_get_segment(struct rbd_image_header *header,
 638                           const char *block_name,
 639                           u64 ofs, u64 len,
 640                           char *seg_name, u64 *segofs)
 641{
 642        u64 seg = ofs >> header->obj_order;
 643
 644        if (seg_name)
 645                snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 646                         "%s.%012llx", block_name, seg);
 647
 648        ofs = ofs & ((1 << header->obj_order) - 1);
 649        len = min_t(u64, len, (1 << header->obj_order) - ofs);
 650
 651        if (segofs)
 652                *segofs = ofs;
 653
 654        return len;
 655}
 656
 657static int rbd_get_num_segments(struct rbd_image_header *header,
 658                                u64 ofs, u64 len)
 659{
 660        u64 start_seg = ofs >> header->obj_order;
 661        u64 end_seg = (ofs + len - 1) >> header->obj_order;
 662        return end_seg - start_seg + 1;
 663}
 664
 665/*
 666 * returns the size of an object in the image
 667 */
 668static u64 rbd_obj_bytes(struct rbd_image_header *header)
 669{
 670        return 1 << header->obj_order;
 671}
 672
 673/*
 674 * bio helpers
 675 */
 676
 677static void bio_chain_put(struct bio *chain)
 678{
 679        struct bio *tmp;
 680
 681        while (chain) {
 682                tmp = chain;
 683                chain = chain->bi_next;
 684                bio_put(tmp);
 685        }
 686}
 687
 688/*
 689 * zeros a bio chain, starting at specific offset
 690 */
 691static void zero_bio_chain(struct bio *chain, int start_ofs)
 692{
 693        struct bio_vec *bv;
 694        unsigned long flags;
 695        void *buf;
 696        int i;
 697        int pos = 0;
 698
 699        while (chain) {
 700                bio_for_each_segment(bv, chain, i) {
 701                        if (pos + bv->bv_len > start_ofs) {
 702                                int remainder = max(start_ofs - pos, 0);
 703                                buf = bvec_kmap_irq(bv, &flags);
 704                                memset(buf + remainder, 0,
 705                                       bv->bv_len - remainder);
 706                                bvec_kunmap_irq(buf, &flags);
 707                        }
 708                        pos += bv->bv_len;
 709                }
 710
 711                chain = chain->bi_next;
 712        }
 713}
 714
 715/*
 716 * bio_chain_clone - clone a chain of bios up to a certain length.
 717 * might return a bio_pair that will need to be released.
 718 */
 719static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 720                                   struct bio_pair **bp,
 721                                   int len, gfp_t gfpmask)
 722{
 723        struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 724        int total = 0;
 725
 726        if (*bp) {
 727                bio_pair_release(*bp);
 728                *bp = NULL;
 729        }
 730
 731        while (old_chain && (total < len)) {
 732                tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 733                if (!tmp)
 734                        goto err_out;
 735
 736                if (total + old_chain->bi_size > len) {
 737                        struct bio_pair *bp;
 738
 739                        /*
 740                         * this split can only happen with a single paged bio,
 741                         * split_bio will BUG_ON if this is not the case
 742                         */
 743                        dout("bio_chain_clone split! total=%d remaining=%d"
 744                             "bi_size=%d\n",
 745                             (int)total, (int)len-total,
 746                             (int)old_chain->bi_size);
 747
 748                        /* split the bio. We'll release it either in the next
 749                           call, or it will have to be released outside */
 750                        bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
 751                        if (!bp)
 752                                goto err_out;
 753
 754                        __bio_clone(tmp, &bp->bio1);
 755
 756                        *next = &bp->bio2;
 757                } else {
 758                        __bio_clone(tmp, old_chain);
 759                        *next = old_chain->bi_next;
 760                }
 761
 762                tmp->bi_bdev = NULL;
 763                gfpmask &= ~__GFP_WAIT;
 764                tmp->bi_next = NULL;
 765
 766                if (!new_chain) {
 767                        new_chain = tail = tmp;
 768                } else {
 769                        tail->bi_next = tmp;
 770                        tail = tmp;
 771                }
 772                old_chain = old_chain->bi_next;
 773
 774                total += tmp->bi_size;
 775        }
 776
 777        BUG_ON(total < len);
 778
 779        if (tail)
 780                tail->bi_next = NULL;
 781
 782        *old = old_chain;
 783
 784        return new_chain;
 785
 786err_out:
 787        dout("bio_chain_clone with err\n");
 788        bio_chain_put(new_chain);
 789        return NULL;
 790}
 791
 792/*
 793 * helpers for osd request op vectors.
 794 */
 795static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 796                            int num_ops,
 797                            int opcode,
 798                            u32 payload_len)
 799{
 800        *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 801                       GFP_NOIO);
 802        if (!*ops)
 803                return -ENOMEM;
 804        (*ops)[0].op = opcode;
 805        /*
 806         * op extent offset and length will be set later on
 807         * in calc_raw_layout()
 808         */
 809        (*ops)[0].payload_len = payload_len;
 810        return 0;
 811}
 812
 813static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 814{
 815        kfree(ops);
 816}
 817
 818static void rbd_coll_end_req_index(struct request *rq,
 819                                   struct rbd_req_coll *coll,
 820                                   int index,
 821                                   int ret, u64 len)
 822{
 823        struct request_queue *q;
 824        int min, max, i;
 825
 826        dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
 827             coll, index, ret, len);
 828
 829        if (!rq)
 830                return;
 831
 832        if (!coll) {
 833                blk_end_request(rq, ret, len);
 834                return;
 835        }
 836
 837        q = rq->q;
 838
 839        spin_lock_irq(q->queue_lock);
 840        coll->status[index].done = 1;
 841        coll->status[index].rc = ret;
 842        coll->status[index].bytes = len;
 843        max = min = coll->num_done;
 844        while (max < coll->total && coll->status[max].done)
 845                max++;
 846
 847        for (i = min; i<max; i++) {
 848                __blk_end_request(rq, coll->status[i].rc,
 849                                  coll->status[i].bytes);
 850                coll->num_done++;
 851                kref_put(&coll->kref, rbd_coll_release);
 852        }
 853        spin_unlock_irq(q->queue_lock);
 854}
 855
 856static void rbd_coll_end_req(struct rbd_request *req,
 857                             int ret, u64 len)
 858{
 859        rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
 860}
 861
 862/*
 863 * Send ceph osd request
 864 */
 865static int rbd_do_request(struct request *rq,
 866                          struct rbd_device *dev,
 867                          struct ceph_snap_context *snapc,
 868                          u64 snapid,
 869                          const char *obj, u64 ofs, u64 len,
 870                          struct bio *bio,
 871                          struct page **pages,
 872                          int num_pages,
 873                          int flags,
 874                          struct ceph_osd_req_op *ops,
 875                          int num_reply,
 876                          struct rbd_req_coll *coll,
 877                          int coll_index,
 878                          void (*rbd_cb)(struct ceph_osd_request *req,
 879                                         struct ceph_msg *msg),
 880                          struct ceph_osd_request **linger_req,
 881                          u64 *ver)
 882{
 883        struct ceph_osd_request *req;
 884        struct ceph_file_layout *layout;
 885        int ret;
 886        u64 bno;
 887        struct timespec mtime = CURRENT_TIME;
 888        struct rbd_request *req_data;
 889        struct ceph_osd_request_head *reqhead;
 890        struct ceph_osd_client *osdc;
 891
 892        req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 893        if (!req_data) {
 894                if (coll)
 895                        rbd_coll_end_req_index(rq, coll, coll_index,
 896                                               -ENOMEM, len);
 897                return -ENOMEM;
 898        }
 899
 900        if (coll) {
 901                req_data->coll = coll;
 902                req_data->coll_index = coll_index;
 903        }
 904
 905        dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 906
 907        down_read(&dev->header_rwsem);
 908
 909        osdc = &dev->rbd_client->client->osdc;
 910        req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
 911                                        false, GFP_NOIO, pages, bio);
 912        if (!req) {
 913                up_read(&dev->header_rwsem);
 914                ret = -ENOMEM;
 915                goto done_pages;
 916        }
 917
 918        req->r_callback = rbd_cb;
 919
 920        req_data->rq = rq;
 921        req_data->bio = bio;
 922        req_data->pages = pages;
 923        req_data->len = len;
 924
 925        req->r_priv = req_data;
 926
 927        reqhead = req->r_request->front.iov_base;
 928        reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 929
 930        strncpy(req->r_oid, obj, sizeof(req->r_oid));
 931        req->r_oid_len = strlen(req->r_oid);
 932
 933        layout = &req->r_file_layout;
 934        memset(layout, 0, sizeof(*layout));
 935        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 936        layout->fl_stripe_count = cpu_to_le32(1);
 937        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 938        layout->fl_pg_preferred = cpu_to_le32(-1);
 939        layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 940        ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 941                                req, ops);
 942
 943        ceph_osdc_build_request(req, ofs, &len,
 944                                ops,
 945                                snapc,
 946                                &mtime,
 947                                req->r_oid, req->r_oid_len);
 948        up_read(&dev->header_rwsem);
 949
 950        if (linger_req) {
 951                ceph_osdc_set_request_linger(osdc, req);
 952                *linger_req = req;
 953        }
 954
 955        ret = ceph_osdc_start_request(osdc, req, false);
 956        if (ret < 0)
 957                goto done_err;
 958
 959        if (!rbd_cb) {
 960                ret = ceph_osdc_wait_request(osdc, req);
 961                if (ver)
 962                        *ver = le64_to_cpu(req->r_reassert_version.version);
 963                dout("reassert_ver=%lld\n",
 964                     le64_to_cpu(req->r_reassert_version.version));
 965                ceph_osdc_put_request(req);
 966        }
 967        return ret;
 968
 969done_err:
 970        bio_chain_put(req_data->bio);
 971        ceph_osdc_put_request(req);
 972done_pages:
 973        rbd_coll_end_req(req_data, ret, len);
 974        kfree(req_data);
 975        return ret;
 976}
 977
 978/*
 979 * Ceph osd op callback
 980 */
 981static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 982{
 983        struct rbd_request *req_data = req->r_priv;
 984        struct ceph_osd_reply_head *replyhead;
 985        struct ceph_osd_op *op;
 986        __s32 rc;
 987        u64 bytes;
 988        int read_op;
 989
 990        /* parse reply */
 991        replyhead = msg->front.iov_base;
 992        WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 993        op = (void *)(replyhead + 1);
 994        rc = le32_to_cpu(replyhead->result);
 995        bytes = le64_to_cpu(op->extent.length);
 996        read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
 997
 998        dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 999
1000        if (rc == -ENOENT && read_op) {
1001                zero_bio_chain(req_data->bio, 0);
1002                rc = 0;
1003        } else if (rc == 0 && read_op && bytes < req_data->len) {
1004                zero_bio_chain(req_data->bio, bytes);
1005                bytes = req_data->len;
1006        }
1007
1008        rbd_coll_end_req(req_data, rc, bytes);
1009
1010        if (req_data->bio)
1011                bio_chain_put(req_data->bio);
1012
1013        ceph_osdc_put_request(req);
1014        kfree(req_data);
1015}
1016
1017static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1018{
1019        ceph_osdc_put_request(req);
1020}
1021
1022/*
1023 * Do a synchronous ceph osd operation
1024 */
1025static int rbd_req_sync_op(struct rbd_device *dev,
1026                           struct ceph_snap_context *snapc,
1027                           u64 snapid,
1028                           int opcode,
1029                           int flags,
1030                           struct ceph_osd_req_op *orig_ops,
1031                           int num_reply,
1032                           const char *obj,
1033                           u64 ofs, u64 len,
1034                           char *buf,
1035                           struct ceph_osd_request **linger_req,
1036                           u64 *ver)
1037{
1038        int ret;
1039        struct page **pages;
1040        int num_pages;
1041        struct ceph_osd_req_op *ops = orig_ops;
1042        u32 payload_len;
1043
1044        num_pages = calc_pages_for(ofs , len);
1045        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046        if (IS_ERR(pages))
1047                return PTR_ERR(pages);
1048
1049        if (!orig_ops) {
1050                payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1051                ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1052                if (ret < 0)
1053                        goto done;
1054
1055                if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1056                        ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1057                        if (ret < 0)
1058                                goto done_ops;
1059                }
1060        }
1061
1062        ret = rbd_do_request(NULL, dev, snapc, snapid,
1063                          obj, ofs, len, NULL,
1064                          pages, num_pages,
1065                          flags,
1066                          ops,
1067                          2,
1068                          NULL, 0,
1069                          NULL,
1070                          linger_req, ver);
1071        if (ret < 0)
1072                goto done_ops;
1073
1074        if ((flags & CEPH_OSD_FLAG_READ) && buf)
1075                ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1076
1077done_ops:
1078        if (!orig_ops)
1079                rbd_destroy_ops(ops);
1080done:
1081        ceph_release_page_vector(pages, num_pages);
1082        return ret;
1083}
1084
1085/*
1086 * Do an asynchronous ceph osd operation
1087 */
1088static int rbd_do_op(struct request *rq,
1089                     struct rbd_device *rbd_dev ,
1090                     struct ceph_snap_context *snapc,
1091                     u64 snapid,
1092                     int opcode, int flags, int num_reply,
1093                     u64 ofs, u64 len,
1094                     struct bio *bio,
1095                     struct rbd_req_coll *coll,
1096                     int coll_index)
1097{
1098        char *seg_name;
1099        u64 seg_ofs;
1100        u64 seg_len;
1101        int ret;
1102        struct ceph_osd_req_op *ops;
1103        u32 payload_len;
1104
1105        seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1106        if (!seg_name)
1107                return -ENOMEM;
1108
1109        seg_len = rbd_get_segment(&rbd_dev->header,
1110                                  rbd_dev->header.block_name,
1111                                  ofs, len,
1112                                  seg_name, &seg_ofs);
1113
1114        payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1115
1116        ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1117        if (ret < 0)
1118                goto done;
1119
1120        /* we've taken care of segment sizes earlier when we
1121           cloned the bios. We should never have a segment
1122           truncated at this point */
1123        BUG_ON(seg_len < len);
1124
1125        ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1126                             seg_name, seg_ofs, seg_len,
1127                             bio,
1128                             NULL, 0,
1129                             flags,
1130                             ops,
1131                             num_reply,
1132                             coll, coll_index,
1133                             rbd_req_cb, 0, NULL);
1134
1135        rbd_destroy_ops(ops);
1136done:
1137        kfree(seg_name);
1138        return ret;
1139}
1140
1141/*
1142 * Request async osd write
1143 */
1144static int rbd_req_write(struct request *rq,
1145                         struct rbd_device *rbd_dev,
1146                         struct ceph_snap_context *snapc,
1147                         u64 ofs, u64 len,
1148                         struct bio *bio,
1149                         struct rbd_req_coll *coll,
1150                         int coll_index)
1151{
1152        return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1153                         CEPH_OSD_OP_WRITE,
1154                         CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1155                         2,
1156                         ofs, len, bio, coll, coll_index);
1157}
1158
1159/*
1160 * Request async osd read
1161 */
1162static int rbd_req_read(struct request *rq,
1163                         struct rbd_device *rbd_dev,
1164                         u64 snapid,
1165                         u64 ofs, u64 len,
1166                         struct bio *bio,
1167                         struct rbd_req_coll *coll,
1168                         int coll_index)
1169{
1170        return rbd_do_op(rq, rbd_dev, NULL,
1171                         (snapid ? snapid : CEPH_NOSNAP),
1172                         CEPH_OSD_OP_READ,
1173                         CEPH_OSD_FLAG_READ,
1174                         2,
1175                         ofs, len, bio, coll, coll_index);
1176}
1177
1178/*
1179 * Request sync osd read
1180 */
1181static int rbd_req_sync_read(struct rbd_device *dev,
1182                          struct ceph_snap_context *snapc,
1183                          u64 snapid,
1184                          const char *obj,
1185                          u64 ofs, u64 len,
1186                          char *buf,
1187                          u64 *ver)
1188{
1189        return rbd_req_sync_op(dev, NULL,
1190                               (snapid ? snapid : CEPH_NOSNAP),
1191                               CEPH_OSD_OP_READ,
1192                               CEPH_OSD_FLAG_READ,
1193                               NULL,
1194                               1, obj, ofs, len, buf, NULL, ver);
1195}
1196
1197/*
1198 * Request sync osd watch
1199 */
1200static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1201                                   u64 ver,
1202                                   u64 notify_id,
1203                                   const char *obj)
1204{
1205        struct ceph_osd_req_op *ops;
1206        struct page **pages = NULL;
1207        int ret;
1208
1209        ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1210        if (ret < 0)
1211                return ret;
1212
1213        ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1214        ops[0].watch.cookie = notify_id;
1215        ops[0].watch.flag = 0;
1216
1217        ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1218                          obj, 0, 0, NULL,
1219                          pages, 0,
1220                          CEPH_OSD_FLAG_READ,
1221                          ops,
1222                          1,
1223                          NULL, 0,
1224                          rbd_simple_req_cb, 0, NULL);
1225
1226        rbd_destroy_ops(ops);
1227        return ret;
1228}
1229
1230static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1231{
1232        struct rbd_device *dev = (struct rbd_device *)data;
1233        int rc;
1234
1235        if (!dev)
1236                return;
1237
1238        dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1239                notify_id, (int)opcode);
1240        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1241        rc = __rbd_update_snaps(dev);
1242        mutex_unlock(&ctl_mutex);
1243        if (rc)
1244                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1245                           " update snaps: %d\n", dev->major, rc);
1246
1247        rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1248}
1249
1250/*
1251 * Request sync osd watch
1252 */
1253static int rbd_req_sync_watch(struct rbd_device *dev,
1254                              const char *obj,
1255                              u64 ver)
1256{
1257        struct ceph_osd_req_op *ops;
1258        struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1259
1260        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1261        if (ret < 0)
1262                return ret;
1263
1264        ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1265                                     (void *)dev, &dev->watch_event);
1266        if (ret < 0)
1267                goto fail;
1268
1269        ops[0].watch.ver = cpu_to_le64(ver);
1270        ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1271        ops[0].watch.flag = 1;
1272
1273        ret = rbd_req_sync_op(dev, NULL,
1274                              CEPH_NOSNAP,
1275                              0,
1276                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1277                              ops,
1278                              1, obj, 0, 0, NULL,
1279                              &dev->watch_request, NULL);
1280
1281        if (ret < 0)
1282                goto fail_event;
1283
1284        rbd_destroy_ops(ops);
1285        return 0;
1286
1287fail_event:
1288        ceph_osdc_cancel_event(dev->watch_event);
1289        dev->watch_event = NULL;
1290fail:
1291        rbd_destroy_ops(ops);
1292        return ret;
1293}
1294
1295/*
1296 * Request sync osd unwatch
1297 */
1298static int rbd_req_sync_unwatch(struct rbd_device *dev,
1299                                const char *obj)
1300{
1301        struct ceph_osd_req_op *ops;
1302
1303        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1304        if (ret < 0)
1305                return ret;
1306
1307        ops[0].watch.ver = 0;
1308        ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1309        ops[0].watch.flag = 0;
1310
1311        ret = rbd_req_sync_op(dev, NULL,
1312                              CEPH_NOSNAP,
1313                              0,
1314                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1315                              ops,
1316                              1, obj, 0, 0, NULL, NULL, NULL);
1317
1318        rbd_destroy_ops(ops);
1319        ceph_osdc_cancel_event(dev->watch_event);
1320        dev->watch_event = NULL;
1321        return ret;
1322}
1323
1324struct rbd_notify_info {
1325        struct rbd_device *dev;
1326};
1327
1328static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1329{
1330        struct rbd_device *dev = (struct rbd_device *)data;
1331        if (!dev)
1332                return;
1333
1334        dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1335                notify_id, (int)opcode);
1336}
1337
1338/*
1339 * Request sync osd notify
1340 */
1341static int rbd_req_sync_notify(struct rbd_device *dev,
1342                          const char *obj)
1343{
1344        struct ceph_osd_req_op *ops;
1345        struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1346        struct ceph_osd_event *event;
1347        struct rbd_notify_info info;
1348        int payload_len = sizeof(u32) + sizeof(u32);
1349        int ret;
1350
1351        ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1352        if (ret < 0)
1353                return ret;
1354
1355        info.dev = dev;
1356
1357        ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1358                                     (void *)&info, &event);
1359        if (ret < 0)
1360                goto fail;
1361
1362        ops[0].watch.ver = 1;
1363        ops[0].watch.flag = 1;
1364        ops[0].watch.cookie = event->cookie;
1365        ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1366        ops[0].watch.timeout = 12;
1367
1368        ret = rbd_req_sync_op(dev, NULL,
1369                               CEPH_NOSNAP,
1370                               0,
1371                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372                               ops,
1373                               1, obj, 0, 0, NULL, NULL, NULL);
1374        if (ret < 0)
1375                goto fail_event;
1376
1377        ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1378        dout("ceph_osdc_wait_event returned %d\n", ret);
1379        rbd_destroy_ops(ops);
1380        return 0;
1381
1382fail_event:
1383        ceph_osdc_cancel_event(event);
1384fail:
1385        rbd_destroy_ops(ops);
1386        return ret;
1387}
1388
1389/*
1390 * Request sync osd read
1391 */
1392static int rbd_req_sync_exec(struct rbd_device *dev,
1393                             const char *obj,
1394                             const char *cls,
1395                             const char *method,
1396                             const char *data,
1397                             int len,
1398                             u64 *ver)
1399{
1400        struct ceph_osd_req_op *ops;
1401        int cls_len = strlen(cls);
1402        int method_len = strlen(method);
1403        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1404                                    cls_len + method_len + len);
1405        if (ret < 0)
1406                return ret;
1407
1408        ops[0].cls.class_name = cls;
1409        ops[0].cls.class_len = (__u8)cls_len;
1410        ops[0].cls.method_name = method;
1411        ops[0].cls.method_len = (__u8)method_len;
1412        ops[0].cls.argc = 0;
1413        ops[0].cls.indata = data;
1414        ops[0].cls.indata_len = len;
1415
1416        ret = rbd_req_sync_op(dev, NULL,
1417                               CEPH_NOSNAP,
1418                               0,
1419                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420                               ops,
1421                               1, obj, 0, 0, NULL, NULL, ver);
1422
1423        rbd_destroy_ops(ops);
1424
1425        dout("cls_exec returned %d\n", ret);
1426        return ret;
1427}
1428
1429static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1430{
1431        struct rbd_req_coll *coll =
1432                        kzalloc(sizeof(struct rbd_req_coll) +
1433                                sizeof(struct rbd_req_status) * num_reqs,
1434                                GFP_ATOMIC);
1435
1436        if (!coll)
1437                return NULL;
1438        coll->total = num_reqs;
1439        kref_init(&coll->kref);
1440        return coll;
1441}
1442
1443/*
1444 * block device queue callback
1445 */
1446static void rbd_rq_fn(struct request_queue *q)
1447{
1448        struct rbd_device *rbd_dev = q->queuedata;
1449        struct request *rq;
1450        struct bio_pair *bp = NULL;
1451
1452        while ((rq = blk_fetch_request(q))) {
1453                struct bio *bio;
1454                struct bio *rq_bio, *next_bio = NULL;
1455                bool do_write;
1456                int size, op_size = 0;
1457                u64 ofs;
1458                int num_segs, cur_seg = 0;
1459                struct rbd_req_coll *coll;
1460
1461                /* peek at request from block layer */
1462                if (!rq)
1463                        break;
1464
1465                dout("fetched request\n");
1466
1467                /* filter out block requests we don't understand */
1468                if ((rq->cmd_type != REQ_TYPE_FS)) {
1469                        __blk_end_request_all(rq, 0);
1470                        continue;
1471                }
1472
1473                /* deduce our operation (read, write) */
1474                do_write = (rq_data_dir(rq) == WRITE);
1475
1476                size = blk_rq_bytes(rq);
1477                ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1478                rq_bio = rq->bio;
1479                if (do_write && rbd_dev->read_only) {
1480                        __blk_end_request_all(rq, -EROFS);
1481                        continue;
1482                }
1483
1484                spin_unlock_irq(q->queue_lock);
1485
1486                dout("%s 0x%x bytes at 0x%llx\n",
1487                     do_write ? "write" : "read",
1488                     size, blk_rq_pos(rq) * SECTOR_SIZE);
1489
1490                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491                coll = rbd_alloc_coll(num_segs);
1492                if (!coll) {
1493                        spin_lock_irq(q->queue_lock);
1494                        __blk_end_request_all(rq, -ENOMEM);
1495                        continue;
1496                }
1497
1498                do {
1499                        /* a bio clone to be passed down to OSD req */
1500                        dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1501                        op_size = rbd_get_segment(&rbd_dev->header,
1502                                                  rbd_dev->header.block_name,
1503                                                  ofs, size,
1504                                                  NULL, NULL);
1505                        kref_get(&coll->kref);
1506                        bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1507                                              op_size, GFP_ATOMIC);
1508                        if (!bio) {
1509                                rbd_coll_end_req_index(rq, coll, cur_seg,
1510                                                       -ENOMEM, op_size);
1511                                goto next_seg;
1512                        }
1513
1514
1515                        /* init OSD command: write or read */
1516                        if (do_write)
1517                                rbd_req_write(rq, rbd_dev,
1518                                              rbd_dev->header.snapc,
1519                                              ofs,
1520                                              op_size, bio,
1521                                              coll, cur_seg);
1522                        else
1523                                rbd_req_read(rq, rbd_dev,
1524                                             cur_snap_id(rbd_dev),
1525                                             ofs,
1526                                             op_size, bio,
1527                                             coll, cur_seg);
1528
1529next_seg:
1530                        size -= op_size;
1531                        ofs += op_size;
1532
1533                        cur_seg++;
1534                        rq_bio = next_bio;
1535                } while (size > 0);
1536                kref_put(&coll->kref, rbd_coll_release);
1537
1538                if (bp)
1539                        bio_pair_release(bp);
1540                spin_lock_irq(q->queue_lock);
1541        }
1542}
1543
1544/*
1545 * a queue callback. Makes sure that we don't create a bio that spans across
1546 * multiple osd objects. One exception would be with a single page bios,
1547 * which we handle later at bio_chain_clone
1548 */
1549static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1550                          struct bio_vec *bvec)
1551{
1552        struct rbd_device *rbd_dev = q->queuedata;
1553        unsigned int chunk_sectors;
1554        sector_t sector;
1555        unsigned int bio_sectors;
1556        int max;
1557
1558        chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1559        sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1560        bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1561
1562        max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1563                                 + bio_sectors)) << SECTOR_SHIFT;
1564        if (max < 0)
1565                max = 0; /* bio_add cannot handle a negative return */
1566        if (max <= bvec->bv_len && bio_sectors == 0)
1567                return bvec->bv_len;
1568        return max;
1569}
1570
1571static void rbd_free_disk(struct rbd_device *rbd_dev)
1572{
1573        struct gendisk *disk = rbd_dev->disk;
1574
1575        if (!disk)
1576                return;
1577
1578        rbd_header_free(&rbd_dev->header);
1579
1580        if (disk->flags & GENHD_FL_UP)
1581                del_gendisk(disk);
1582        if (disk->queue)
1583                blk_cleanup_queue(disk->queue);
1584        put_disk(disk);
1585}
1586
1587/*
1588 * reload the ondisk the header 
1589 */
1590static int rbd_read_header(struct rbd_device *rbd_dev,
1591                           struct rbd_image_header *header)
1592{
1593        ssize_t rc;
1594        struct rbd_image_header_ondisk *dh;
1595        int snap_count = 0;
1596        u64 ver;
1597        size_t len;
1598
1599        /*
1600         * First reads the fixed-size header to determine the number
1601         * of snapshots, then re-reads it, along with all snapshot
1602         * records as well as their stored names.
1603         */
1604        len = sizeof (*dh);
1605        while (1) {
1606                dh = kmalloc(len, GFP_KERNEL);
1607                if (!dh)
1608                        return -ENOMEM;
1609
1610                rc = rbd_req_sync_read(rbd_dev,
1611                                       NULL, CEPH_NOSNAP,
1612                                       rbd_dev->obj_md_name,
1613                                       0, len,
1614                                       (char *)dh, &ver);
1615                if (rc < 0)
1616                        goto out_dh;
1617
1618                rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1619                if (rc < 0) {
1620                        if (rc == -ENXIO)
1621                                pr_warning("unrecognized header format"
1622                                           " for image %s", rbd_dev->obj);
1623                        goto out_dh;
1624                }
1625
1626                if (snap_count == header->total_snaps)
1627                        break;
1628
1629                snap_count = header->total_snaps;
1630                len = sizeof (*dh) +
1631                        snap_count * sizeof(struct rbd_image_snap_ondisk) +
1632                        header->snap_names_len;
1633
1634                rbd_header_free(header);
1635                kfree(dh);
1636        }
1637        header->obj_version = ver;
1638
1639out_dh:
1640        kfree(dh);
1641        return rc;
1642}
1643
1644/*
1645 * create a snapshot
1646 */
1647static int rbd_header_add_snap(struct rbd_device *dev,
1648                               const char *snap_name,
1649                               gfp_t gfp_flags)
1650{
1651        int name_len = strlen(snap_name);
1652        u64 new_snapid;
1653        int ret;
1654        void *data, *p, *e;
1655        u64 ver;
1656        struct ceph_mon_client *monc;
1657
1658        /* we should create a snapshot only if we're pointing at the head */
1659        if (dev->cur_snap)
1660                return -EINVAL;
1661
1662        monc = &dev->rbd_client->client->monc;
1663        ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1664        dout("created snapid=%lld\n", new_snapid);
1665        if (ret < 0)
1666                return ret;
1667
1668        data = kmalloc(name_len + 16, gfp_flags);
1669        if (!data)
1670                return -ENOMEM;
1671
1672        p = data;
1673        e = data + name_len + 16;
1674
1675        ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1676        ceph_encode_64_safe(&p, e, new_snapid, bad);
1677
1678        ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1679                                data, p - data, &ver);
1680
1681        kfree(data);
1682
1683        if (ret < 0)
1684                return ret;
1685
1686        dev->header.snapc->seq =  new_snapid;
1687
1688        return 0;
1689bad:
1690        return -ERANGE;
1691}
1692
1693static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1694{
1695        struct rbd_snap *snap;
1696
1697        while (!list_empty(&rbd_dev->snaps)) {
1698                snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1699                __rbd_remove_snap_dev(rbd_dev, snap);
1700        }
1701}
1702
1703/*
1704 * only read the first part of the ondisk header, without the snaps info
1705 */
1706static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1707{
1708        int ret;
1709        struct rbd_image_header h;
1710        u64 snap_seq;
1711        int follow_seq = 0;
1712
1713        ret = rbd_read_header(rbd_dev, &h);
1714        if (ret < 0)
1715                return ret;
1716
1717        /* resized? */
1718        set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1719
1720        down_write(&rbd_dev->header_rwsem);
1721
1722        snap_seq = rbd_dev->header.snapc->seq;
1723        if (rbd_dev->header.total_snaps &&
1724            rbd_dev->header.snapc->snaps[0] == snap_seq)
1725                /* pointing at the head, will need to follow that
1726                   if head moves */
1727                follow_seq = 1;
1728
1729        kfree(rbd_dev->header.snapc);
1730        kfree(rbd_dev->header.snap_names);
1731        kfree(rbd_dev->header.snap_sizes);
1732
1733        rbd_dev->header.total_snaps = h.total_snaps;
1734        rbd_dev->header.snapc = h.snapc;
1735        rbd_dev->header.snap_names = h.snap_names;
1736        rbd_dev->header.snap_names_len = h.snap_names_len;
1737        rbd_dev->header.snap_sizes = h.snap_sizes;
1738        if (follow_seq)
1739                rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1740        else
1741                rbd_dev->header.snapc->seq = snap_seq;
1742
1743        ret = __rbd_init_snaps_header(rbd_dev);
1744
1745        up_write(&rbd_dev->header_rwsem);
1746
1747        return ret;
1748}
1749
1750static int rbd_init_disk(struct rbd_device *rbd_dev)
1751{
1752        struct gendisk *disk;
1753        struct request_queue *q;
1754        int rc;
1755        u64 segment_size;
1756        u64 total_size = 0;
1757
1758        /* contact OSD, request size info about the object being mapped */
1759        rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1760        if (rc)
1761                return rc;
1762
1763        /* no need to lock here, as rbd_dev is not registered yet */
1764        rc = __rbd_init_snaps_header(rbd_dev);
1765        if (rc)
1766                return rc;
1767
1768        rc = rbd_header_set_snap(rbd_dev, &total_size);
1769        if (rc)
1770                return rc;
1771
1772        /* create gendisk info */
1773        rc = -ENOMEM;
1774        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1775        if (!disk)
1776                goto out;
1777
1778        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1779                 rbd_dev->id);
1780        disk->major = rbd_dev->major;
1781        disk->first_minor = 0;
1782        disk->fops = &rbd_bd_ops;
1783        disk->private_data = rbd_dev;
1784
1785        /* init rq */
1786        rc = -ENOMEM;
1787        q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1788        if (!q)
1789                goto out_disk;
1790
1791        /* We use the default size, but let's be explicit about it. */
1792        blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
1794        /* set io sizes to object size */
1795        segment_size = rbd_obj_bytes(&rbd_dev->header);
1796        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1797        blk_queue_max_segment_size(q, segment_size);
1798        blk_queue_io_min(q, segment_size);
1799        blk_queue_io_opt(q, segment_size);
1800
1801        blk_queue_merge_bvec(q, rbd_merge_bvec);
1802        disk->queue = q;
1803
1804        q->queuedata = rbd_dev;
1805
1806        rbd_dev->disk = disk;
1807        rbd_dev->q = q;
1808
1809        /* finally, announce the disk to the world */
1810        set_capacity(disk, total_size / SECTOR_SIZE);
1811        add_disk(disk);
1812
1813        pr_info("%s: added with size 0x%llx\n",
1814                disk->disk_name, (unsigned long long)total_size);
1815        return 0;
1816
1817out_disk:
1818        put_disk(disk);
1819out:
1820        return rc;
1821}
1822
1823/*
1824  sysfs
1825*/
1826
1827static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828{
1829        return container_of(dev, struct rbd_device, dev);
1830}
1831
1832static ssize_t rbd_size_show(struct device *dev,
1833                             struct device_attribute *attr, char *buf)
1834{
1835        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836
1837        return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1838}
1839
1840static ssize_t rbd_major_show(struct device *dev,
1841                              struct device_attribute *attr, char *buf)
1842{
1843        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844
1845        return sprintf(buf, "%d\n", rbd_dev->major);
1846}
1847
1848static ssize_t rbd_client_id_show(struct device *dev,
1849                                  struct device_attribute *attr, char *buf)
1850{
1851        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1852
1853        return sprintf(buf, "client%lld\n",
1854                        ceph_client_id(rbd_dev->rbd_client->client));
1855}
1856
1857static ssize_t rbd_pool_show(struct device *dev,
1858                             struct device_attribute *attr, char *buf)
1859{
1860        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862        return sprintf(buf, "%s\n", rbd_dev->pool_name);
1863}
1864
1865static ssize_t rbd_name_show(struct device *dev,
1866                             struct device_attribute *attr, char *buf)
1867{
1868        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870        return sprintf(buf, "%s\n", rbd_dev->obj);
1871}
1872
1873static ssize_t rbd_snap_show(struct device *dev,
1874                             struct device_attribute *attr,
1875                             char *buf)
1876{
1877        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879        return sprintf(buf, "%s\n", rbd_dev->snap_name);
1880}
1881
1882static ssize_t rbd_image_refresh(struct device *dev,
1883                                 struct device_attribute *attr,
1884                                 const char *buf,
1885                                 size_t size)
1886{
1887        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888        int rc;
1889        int ret = size;
1890
1891        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1892
1893        rc = __rbd_update_snaps(rbd_dev);
1894        if (rc < 0)
1895                ret = rc;
1896
1897        mutex_unlock(&ctl_mutex);
1898        return ret;
1899}
1900
1901static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1902static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1903static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1904static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1905static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1906static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1907static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1908static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1909
1910static struct attribute *rbd_attrs[] = {
1911        &dev_attr_size.attr,
1912        &dev_attr_major.attr,
1913        &dev_attr_client_id.attr,
1914        &dev_attr_pool.attr,
1915        &dev_attr_name.attr,
1916        &dev_attr_current_snap.attr,
1917        &dev_attr_refresh.attr,
1918        &dev_attr_create_snap.attr,
1919        NULL
1920};
1921
1922static struct attribute_group rbd_attr_group = {
1923        .attrs = rbd_attrs,
1924};
1925
1926static const struct attribute_group *rbd_attr_groups[] = {
1927        &rbd_attr_group,
1928        NULL
1929};
1930
1931static void rbd_sysfs_dev_release(struct device *dev)
1932{
1933}
1934
1935static struct device_type rbd_device_type = {
1936        .name           = "rbd",
1937        .groups         = rbd_attr_groups,
1938        .release        = rbd_sysfs_dev_release,
1939};
1940
1941
1942/*
1943  sysfs - snapshots
1944*/
1945
1946static ssize_t rbd_snap_size_show(struct device *dev,
1947                                  struct device_attribute *attr,
1948                                  char *buf)
1949{
1950        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1951
1952        return sprintf(buf, "%zd\n", snap->size);
1953}
1954
1955static ssize_t rbd_snap_id_show(struct device *dev,
1956                                struct device_attribute *attr,
1957                                char *buf)
1958{
1959        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960
1961        return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1962}
1963
1964static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1965static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1966
1967static struct attribute *rbd_snap_attrs[] = {
1968        &dev_attr_snap_size.attr,
1969        &dev_attr_snap_id.attr,
1970        NULL,
1971};
1972
1973static struct attribute_group rbd_snap_attr_group = {
1974        .attrs = rbd_snap_attrs,
1975};
1976
1977static void rbd_snap_dev_release(struct device *dev)
1978{
1979        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1980        kfree(snap->name);
1981        kfree(snap);
1982}
1983
1984static const struct attribute_group *rbd_snap_attr_groups[] = {
1985        &rbd_snap_attr_group,
1986        NULL
1987};
1988
1989static struct device_type rbd_snap_device_type = {
1990        .groups         = rbd_snap_attr_groups,
1991        .release        = rbd_snap_dev_release,
1992};
1993
1994static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1995                                  struct rbd_snap *snap)
1996{
1997        list_del(&snap->node);
1998        device_unregister(&snap->dev);
1999}
2000
2001static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2002                                  struct rbd_snap *snap,
2003                                  struct device *parent)
2004{
2005        struct device *dev = &snap->dev;
2006        int ret;
2007
2008        dev->type = &rbd_snap_device_type;
2009        dev->parent = parent;
2010        dev->release = rbd_snap_dev_release;
2011        dev_set_name(dev, "snap_%s", snap->name);
2012        ret = device_register(dev);
2013
2014        return ret;
2015}
2016
2017static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2018                              int i, const char *name,
2019                              struct rbd_snap **snapp)
2020{
2021        int ret;
2022        struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2023        if (!snap)
2024                return -ENOMEM;
2025        snap->name = kstrdup(name, GFP_KERNEL);
2026        snap->size = rbd_dev->header.snap_sizes[i];
2027        snap->id = rbd_dev->header.snapc->snaps[i];
2028        if (device_is_registered(&rbd_dev->dev)) {
2029                ret = rbd_register_snap_dev(rbd_dev, snap,
2030                                             &rbd_dev->dev);
2031                if (ret < 0)
2032                        goto err;
2033        }
2034        *snapp = snap;
2035        return 0;
2036err:
2037        kfree(snap->name);
2038        kfree(snap);
2039        return ret;
2040}
2041
2042/*
2043 * search for the previous snap in a null delimited string list
2044 */
2045const char *rbd_prev_snap_name(const char *name, const char *start)
2046{
2047        if (name < start + 2)
2048                return NULL;
2049
2050        name -= 2;
2051        while (*name) {
2052                if (name == start)
2053                        return start;
2054                name--;
2055        }
2056        return name + 1;
2057}
2058
2059/*
2060 * compare the old list of snapshots that we have to what's in the header
2061 * and update it accordingly. Note that the header holds the snapshots
2062 * in a reverse order (from newest to oldest) and we need to go from
2063 * older to new so that we don't get a duplicate snap name when
2064 * doing the process (e.g., removed snapshot and recreated a new
2065 * one with the same name.
2066 */
2067static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2068{
2069        const char *name, *first_name;
2070        int i = rbd_dev->header.total_snaps;
2071        struct rbd_snap *snap, *old_snap = NULL;
2072        int ret;
2073        struct list_head *p, *n;
2074
2075        first_name = rbd_dev->header.snap_names;
2076        name = first_name + rbd_dev->header.snap_names_len;
2077
2078        list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2079                u64 cur_id;
2080
2081                old_snap = list_entry(p, struct rbd_snap, node);
2082
2083                if (i)
2084                        cur_id = rbd_dev->header.snapc->snaps[i - 1];
2085
2086                if (!i || old_snap->id < cur_id) {
2087                        /* old_snap->id was skipped, thus was removed */
2088                        __rbd_remove_snap_dev(rbd_dev, old_snap);
2089                        continue;
2090                }
2091                if (old_snap->id == cur_id) {
2092                        /* we have this snapshot already */
2093                        i--;
2094                        name = rbd_prev_snap_name(name, first_name);
2095                        continue;
2096                }
2097                for (; i > 0;
2098                     i--, name = rbd_prev_snap_name(name, first_name)) {
2099                        if (!name) {
2100                                WARN_ON(1);
2101                                return -EINVAL;
2102                        }
2103                        cur_id = rbd_dev->header.snapc->snaps[i];
2104                        /* snapshot removal? handle it above */
2105                        if (cur_id >= old_snap->id)
2106                                break;
2107                        /* a new snapshot */
2108                        ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2109                        if (ret < 0)
2110                                return ret;
2111
2112                        /* note that we add it backward so using n and not p */
2113                        list_add(&snap->node, n);
2114                        p = &snap->node;
2115                }
2116        }
2117        /* we're done going over the old snap list, just add what's left */
2118        for (; i > 0; i--) {
2119                name = rbd_prev_snap_name(name, first_name);
2120                if (!name) {
2121                        WARN_ON(1);
2122                        return -EINVAL;
2123                }
2124                ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2125                if (ret < 0)
2126                        return ret;
2127                list_add(&snap->node, &rbd_dev->snaps);
2128        }
2129
2130        return 0;
2131}
2132
2133static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2134{
2135        int ret;
2136        struct device *dev;
2137        struct rbd_snap *snap;
2138
2139        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2140        dev = &rbd_dev->dev;
2141
2142        dev->bus = &rbd_bus_type;
2143        dev->type = &rbd_device_type;
2144        dev->parent = &rbd_root_dev;
2145        dev->release = rbd_dev_release;
2146        dev_set_name(dev, "%d", rbd_dev->id);
2147        ret = device_register(dev);
2148        if (ret < 0)
2149                goto out;
2150
2151        list_for_each_entry(snap, &rbd_dev->snaps, node) {
2152                ret = rbd_register_snap_dev(rbd_dev, snap,
2153                                             &rbd_dev->dev);
2154                if (ret < 0)
2155                        break;
2156        }
2157out:
2158        mutex_unlock(&ctl_mutex);
2159        return ret;
2160}
2161
2162static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2163{
2164        device_unregister(&rbd_dev->dev);
2165}
2166
2167static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2168{
2169        int ret, rc;
2170
2171        do {
2172                ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2173                                         rbd_dev->header.obj_version);
2174                if (ret == -ERANGE) {
2175                        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2176                        rc = __rbd_update_snaps(rbd_dev);
2177                        mutex_unlock(&ctl_mutex);
2178                        if (rc < 0)
2179                                return rc;
2180                }
2181        } while (ret == -ERANGE);
2182
2183        return ret;
2184}
2185
2186static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2187
2188/*
2189 * Get a unique rbd identifier for the given new rbd_dev, and add
2190 * the rbd_dev to the global list.  The minimum rbd id is 1.
2191 */
2192static void rbd_id_get(struct rbd_device *rbd_dev)
2193{
2194        rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2195
2196        spin_lock(&rbd_dev_list_lock);
2197        list_add_tail(&rbd_dev->node, &rbd_dev_list);
2198        spin_unlock(&rbd_dev_list_lock);
2199}
2200
2201/*
2202 * Remove an rbd_dev from the global list, and record that its
2203 * identifier is no longer in use.
2204 */
2205static void rbd_id_put(struct rbd_device *rbd_dev)
2206{
2207        struct list_head *tmp;
2208        int rbd_id = rbd_dev->id;
2209        int max_id;
2210
2211        BUG_ON(rbd_id < 1);
2212
2213        spin_lock(&rbd_dev_list_lock);
2214        list_del_init(&rbd_dev->node);
2215
2216        /*
2217         * If the id being "put" is not the current maximum, there
2218         * is nothing special we need to do.
2219         */
2220        if (rbd_id != atomic64_read(&rbd_id_max)) {
2221                spin_unlock(&rbd_dev_list_lock);
2222                return;
2223        }
2224
2225        /*
2226         * We need to update the current maximum id.  Search the
2227         * list to find out what it is.  We're more likely to find
2228         * the maximum at the end, so search the list backward.
2229         */
2230        max_id = 0;
2231        list_for_each_prev(tmp, &rbd_dev_list) {
2232                struct rbd_device *rbd_dev;
2233
2234                rbd_dev = list_entry(tmp, struct rbd_device, node);
2235                if (rbd_id > max_id)
2236                        max_id = rbd_id;
2237        }
2238        spin_unlock(&rbd_dev_list_lock);
2239
2240        /*
2241         * The max id could have been updated by rbd_id_get(), in
2242         * which case it now accurately reflects the new maximum.
2243         * Be careful not to overwrite the maximum value in that
2244         * case.
2245         */
2246        atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2247}
2248
2249/*
2250 * Skips over white space at *buf, and updates *buf to point to the
2251 * first found non-space character (if any). Returns the length of
2252 * the token (string of non-white space characters) found.  Note
2253 * that *buf must be terminated with '\0'.
2254 */
2255static inline size_t next_token(const char **buf)
2256{
2257        /*
2258        * These are the characters that produce nonzero for
2259        * isspace() in the "C" and "POSIX" locales.
2260        */
2261        const char *spaces = " \f\n\r\t\v";
2262
2263        *buf += strspn(*buf, spaces);   /* Find start of token */
2264
2265        return strcspn(*buf, spaces);   /* Return token length */
2266}
2267
2268/*
2269 * Finds the next token in *buf, and if the provided token buffer is
2270 * big enough, copies the found token into it.  The result, if
2271 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2272 * must be terminated with '\0' on entry.
2273 *
2274 * Returns the length of the token found (not including the '\0').
2275 * Return value will be 0 if no token is found, and it will be >=
2276 * token_size if the token would not fit.
2277 *
2278 * The *buf pointer will be updated to point beyond the end of the
2279 * found token.  Note that this occurs even if the token buffer is
2280 * too small to hold it.
2281 */
2282static inline size_t copy_token(const char **buf,
2283                                char *token,
2284                                size_t token_size)
2285{
2286        size_t len;
2287
2288        len = next_token(buf);
2289        if (len < token_size) {
2290                memcpy(token, *buf, len);
2291                *(token + len) = '\0';
2292        }
2293        *buf += len;
2294
2295        return len;
2296}
2297
2298/*
2299 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2300 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2301 * on the list of monitor addresses and other options provided via
2302 * /sys/bus/rbd/add.
2303 */
2304static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2305                              const char *buf,
2306                              const char **mon_addrs,
2307                              size_t *mon_addrs_size,
2308                              char *options,
2309                              size_t options_size)
2310{
2311        size_t  len;
2312
2313        /* The first four tokens are required */
2314
2315        len = next_token(&buf);
2316        if (!len)
2317                return -EINVAL;
2318        *mon_addrs_size = len + 1;
2319        *mon_addrs = buf;
2320
2321        buf += len;
2322
2323        len = copy_token(&buf, options, options_size);
2324        if (!len || len >= options_size)
2325                return -EINVAL;
2326
2327        len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2328        if (!len || len >= sizeof (rbd_dev->pool_name))
2329                return -EINVAL;
2330
2331        len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2332        if (!len || len >= sizeof (rbd_dev->obj))
2333                return -EINVAL;
2334
2335        /* We have the object length in hand, save it. */
2336
2337        rbd_dev->obj_len = len;
2338
2339        BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2340                                < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2341        sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2342
2343        /*
2344         * The snapshot name is optional, but it's an error if it's
2345         * too long.  If no snapshot is supplied, fill in the default.
2346         */
2347        len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2348        if (!len)
2349                memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2350                        sizeof (RBD_SNAP_HEAD_NAME));
2351        else if (len >= sizeof (rbd_dev->snap_name))
2352                return -EINVAL;
2353
2354        return 0;
2355}
2356
2357static ssize_t rbd_add(struct bus_type *bus,
2358                       const char *buf,
2359                       size_t count)
2360{
2361        struct rbd_device *rbd_dev;
2362        const char *mon_addrs = NULL;
2363        size_t mon_addrs_size = 0;
2364        char *options = NULL;
2365        struct ceph_osd_client *osdc;
2366        int rc = -ENOMEM;
2367
2368        if (!try_module_get(THIS_MODULE))
2369                return -ENODEV;
2370
2371        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2372        if (!rbd_dev)
2373                goto err_nomem;
2374        options = kmalloc(count, GFP_KERNEL);
2375        if (!options)
2376                goto err_nomem;
2377
2378        /* static rbd_device initialization */
2379        spin_lock_init(&rbd_dev->lock);
2380        INIT_LIST_HEAD(&rbd_dev->node);
2381        INIT_LIST_HEAD(&rbd_dev->snaps);
2382        init_rwsem(&rbd_dev->header_rwsem);
2383
2384        init_rwsem(&rbd_dev->header_rwsem);
2385
2386        /* generate unique id: find highest unique id, add one */
2387        rbd_id_get(rbd_dev);
2388
2389        /* Fill in the device name, now that we have its id. */
2390        BUILD_BUG_ON(DEV_NAME_LEN
2391                        < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2392        sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2393
2394        /* parse add command */
2395        rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2396                                options, count);
2397        if (rc)
2398                goto err_put_id;
2399
2400        rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2401                                                options);
2402        if (IS_ERR(rbd_dev->rbd_client)) {
2403                rc = PTR_ERR(rbd_dev->rbd_client);
2404                goto err_put_id;
2405        }
2406
2407        /* pick the pool */
2408        osdc = &rbd_dev->rbd_client->client->osdc;
2409        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2410        if (rc < 0)
2411                goto err_out_client;
2412        rbd_dev->poolid = rc;
2413
2414        /* register our block device */
2415        rc = register_blkdev(0, rbd_dev->name);
2416        if (rc < 0)
2417                goto err_out_client;
2418        rbd_dev->major = rc;
2419
2420        rc = rbd_bus_add_dev(rbd_dev);
2421        if (rc)
2422                goto err_out_blkdev;
2423
2424        /*
2425         * At this point cleanup in the event of an error is the job
2426         * of the sysfs code (initiated by rbd_bus_del_dev()).
2427         *
2428         * Set up and announce blkdev mapping.
2429         */
2430        rc = rbd_init_disk(rbd_dev);
2431        if (rc)
2432                goto err_out_bus;
2433
2434        rc = rbd_init_watch_dev(rbd_dev);
2435        if (rc)
2436                goto err_out_bus;
2437
2438        return count;
2439
2440err_out_bus:
2441        /* this will also clean up rest of rbd_dev stuff */
2442
2443        rbd_bus_del_dev(rbd_dev);
2444        kfree(options);
2445        return rc;
2446
2447err_out_blkdev:
2448        unregister_blkdev(rbd_dev->major, rbd_dev->name);
2449err_out_client:
2450        rbd_put_client(rbd_dev);
2451err_put_id:
2452        rbd_id_put(rbd_dev);
2453err_nomem:
2454        kfree(options);
2455        kfree(rbd_dev);
2456
2457        dout("Error adding device %s\n", buf);
2458        module_put(THIS_MODULE);
2459
2460        return (ssize_t) rc;
2461}
2462
2463static struct rbd_device *__rbd_get_dev(unsigned long id)
2464{
2465        struct list_head *tmp;
2466        struct rbd_device *rbd_dev;
2467
2468        spin_lock(&rbd_dev_list_lock);
2469        list_for_each(tmp, &rbd_dev_list) {
2470                rbd_dev = list_entry(tmp, struct rbd_device, node);
2471                if (rbd_dev->id == id) {
2472                        spin_unlock(&rbd_dev_list_lock);
2473                        return rbd_dev;
2474                }
2475        }
2476        spin_unlock(&rbd_dev_list_lock);
2477        return NULL;
2478}
2479
2480static void rbd_dev_release(struct device *dev)
2481{
2482        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2483
2484        if (rbd_dev->watch_request) {
2485                struct ceph_client *client = rbd_dev->rbd_client->client;
2486
2487                ceph_osdc_unregister_linger_request(&client->osdc,
2488                                                    rbd_dev->watch_request);
2489        }
2490        if (rbd_dev->watch_event)
2491                rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2492
2493        rbd_put_client(rbd_dev);
2494
2495        /* clean up and free blkdev */
2496        rbd_free_disk(rbd_dev);
2497        unregister_blkdev(rbd_dev->major, rbd_dev->name);
2498
2499        /* done with the id, and with the rbd_dev */
2500        rbd_id_put(rbd_dev);
2501        kfree(rbd_dev);
2502
2503        /* release module ref */
2504        module_put(THIS_MODULE);
2505}
2506
2507static ssize_t rbd_remove(struct bus_type *bus,
2508                          const char *buf,
2509                          size_t count)
2510{
2511        struct rbd_device *rbd_dev = NULL;
2512        int target_id, rc;
2513        unsigned long ul;
2514        int ret = count;
2515
2516        rc = strict_strtoul(buf, 10, &ul);
2517        if (rc)
2518                return rc;
2519
2520        /* convert to int; abort if we lost anything in the conversion */
2521        target_id = (int) ul;
2522        if (target_id != ul)
2523                return -EINVAL;
2524
2525        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2526
2527        rbd_dev = __rbd_get_dev(target_id);
2528        if (!rbd_dev) {
2529                ret = -ENOENT;
2530                goto done;
2531        }
2532
2533        __rbd_remove_all_snaps(rbd_dev);
2534        rbd_bus_del_dev(rbd_dev);
2535
2536done:
2537        mutex_unlock(&ctl_mutex);
2538        return ret;
2539}
2540
2541static ssize_t rbd_snap_add(struct device *dev,
2542                            struct device_attribute *attr,
2543                            const char *buf,
2544                            size_t count)
2545{
2546        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2547        int ret;
2548        char *name = kmalloc(count + 1, GFP_KERNEL);
2549        if (!name)
2550                return -ENOMEM;
2551
2552        snprintf(name, count, "%s", buf);
2553
2554        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2555
2556        ret = rbd_header_add_snap(rbd_dev,
2557                                  name, GFP_KERNEL);
2558        if (ret < 0)
2559                goto err_unlock;
2560
2561        ret = __rbd_update_snaps(rbd_dev);
2562        if (ret < 0)
2563                goto err_unlock;
2564
2565        /* shouldn't hold ctl_mutex when notifying.. notify might
2566           trigger a watch callback that would need to get that mutex */
2567        mutex_unlock(&ctl_mutex);
2568
2569        /* make a best effort, don't error if failed */
2570        rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2571
2572        ret = count;
2573        kfree(name);
2574        return ret;
2575
2576err_unlock:
2577        mutex_unlock(&ctl_mutex);
2578        kfree(name);
2579        return ret;
2580}
2581
2582/*
2583 * create control files in sysfs
2584 * /sys/bus/rbd/...
2585 */
2586static int rbd_sysfs_init(void)
2587{
2588        int ret;
2589
2590        ret = device_register(&rbd_root_dev);
2591        if (ret < 0)
2592                return ret;
2593
2594        ret = bus_register(&rbd_bus_type);
2595        if (ret < 0)
2596                device_unregister(&rbd_root_dev);
2597
2598        return ret;
2599}
2600
2601static void rbd_sysfs_cleanup(void)
2602{
2603        bus_unregister(&rbd_bus_type);
2604        device_unregister(&rbd_root_dev);
2605}
2606
2607int __init rbd_init(void)
2608{
2609        int rc;
2610
2611        rc = rbd_sysfs_init();
2612        if (rc)
2613                return rc;
2614        pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2615        return 0;
2616}
2617
2618void __exit rbd_exit(void)
2619{
2620        rbd_sysfs_cleanup();
2621}
2622
2623module_init(rbd_init);
2624module_exit(rbd_exit);
2625
2626MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2627MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2628MODULE_DESCRIPTION("rados block device");
2629
2630/* following authorship retained from original osdblk.c */
2631MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2632
2633MODULE_LICENSE("GPL");
2634