linux/drivers/block/rbd.c
<<
>>
Prefs
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34
  35#include <linux/kernel.h>
  36#include <linux/device.h>
  37#include <linux/module.h>
  38#include <linux/fs.h>
  39#include <linux/blkdev.h>
  40
  41#include "rbd_types.h"
  42
  43#define DRV_NAME "rbd"
  44#define DRV_NAME_LONG "rbd (rados block device)"
  45
  46#define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
  47
  48#define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
  49#define RBD_MAX_POOL_NAME_LEN   64
  50#define RBD_MAX_SNAP_NAME_LEN   32
  51#define RBD_MAX_OPT_LEN         1024
  52
  53#define RBD_SNAP_HEAD_NAME      "-"
  54
  55#define DEV_NAME_LEN            32
  56
  57/*
  58 * block device image metadata (in-memory version)
  59 */
  60struct rbd_image_header {
  61        u64 image_size;
  62        char block_name[32];
  63        __u8 obj_order;
  64        __u8 crypt_type;
  65        __u8 comp_type;
  66        struct rw_semaphore snap_rwsem;
  67        struct ceph_snap_context *snapc;
  68        size_t snap_names_len;
  69        u64 snap_seq;
  70        u32 total_snaps;
  71
  72        char *snap_names;
  73        u64 *snap_sizes;
  74};
  75
  76/*
  77 * an instance of the client.  multiple devices may share a client.
  78 */
  79struct rbd_client {
  80        struct ceph_client      *client;
  81        struct kref             kref;
  82        struct list_head        node;
  83};
  84
  85/*
  86 * a single io request
  87 */
  88struct rbd_request {
  89        struct request          *rq;            /* blk layer request */
  90        struct bio              *bio;           /* cloned bio */
  91        struct page             **pages;        /* list of used pages */
  92        u64                     len;
  93};
  94
  95struct rbd_snap {
  96        struct  device          dev;
  97        const char              *name;
  98        size_t                  size;
  99        struct list_head        node;
 100        u64                     id;
 101};
 102
 103/*
 104 * a single device
 105 */
 106struct rbd_device {
 107        int                     id;             /* blkdev unique id */
 108
 109        int                     major;          /* blkdev assigned major */
 110        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 111        struct request_queue    *q;
 112
 113        struct ceph_client      *client;
 114        struct rbd_client       *rbd_client;
 115
 116        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 117
 118        spinlock_t              lock;           /* queue lock */
 119
 120        struct rbd_image_header header;
 121        char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 122        int                     obj_len;
 123        char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 124        char                    pool_name[RBD_MAX_POOL_NAME_LEN];
 125        int                     poolid;
 126
 127        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 128        u32 cur_snap;   /* index+1 of current snapshot within snap context
 129                           0 - for the head */
 130        int read_only;
 131
 132        struct list_head        node;
 133
 134        /* list of snapshots */
 135        struct list_head        snaps;
 136
 137        /* sysfs related */
 138        struct device           dev;
 139};
 140
 141static struct bus_type rbd_bus_type = {
 142        .name           = "rbd",
 143};
 144
 145static spinlock_t node_lock;      /* protects client get/put */
 146
 147static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
 148static LIST_HEAD(rbd_dev_list);    /* devices */
 149static LIST_HEAD(rbd_client_list);      /* clients */
 150
 151static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 152static void rbd_dev_release(struct device *dev);
 153static ssize_t rbd_snap_rollback(struct device *dev,
 154                                 struct device_attribute *attr,
 155                                 const char *buf,
 156                                 size_t size);
 157static ssize_t rbd_snap_add(struct device *dev,
 158                            struct device_attribute *attr,
 159                            const char *buf,
 160                            size_t count);
 161static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 162                                  struct rbd_snap *snap);;
 163
 164
 165static struct rbd_device *dev_to_rbd(struct device *dev)
 166{
 167        return container_of(dev, struct rbd_device, dev);
 168}
 169
 170static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 171{
 172        return get_device(&rbd_dev->dev);
 173}
 174
 175static void rbd_put_dev(struct rbd_device *rbd_dev)
 176{
 177        put_device(&rbd_dev->dev);
 178}
 179
 180static int rbd_open(struct block_device *bdev, fmode_t mode)
 181{
 182        struct gendisk *disk = bdev->bd_disk;
 183        struct rbd_device *rbd_dev = disk->private_data;
 184
 185        rbd_get_dev(rbd_dev);
 186
 187        set_device_ro(bdev, rbd_dev->read_only);
 188
 189        if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 190                return -EROFS;
 191
 192        return 0;
 193}
 194
 195static int rbd_release(struct gendisk *disk, fmode_t mode)
 196{
 197        struct rbd_device *rbd_dev = disk->private_data;
 198
 199        rbd_put_dev(rbd_dev);
 200
 201        return 0;
 202}
 203
 204static const struct block_device_operations rbd_bd_ops = {
 205        .owner                  = THIS_MODULE,
 206        .open                   = rbd_open,
 207        .release                = rbd_release,
 208};
 209
 210/*
 211 * Initialize an rbd client instance.
 212 * We own *opt.
 213 */
 214static struct rbd_client *rbd_client_create(struct ceph_options *opt)
 215{
 216        struct rbd_client *rbdc;
 217        int ret = -ENOMEM;
 218
 219        dout("rbd_client_create\n");
 220        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 221        if (!rbdc)
 222                goto out_opt;
 223
 224        kref_init(&rbdc->kref);
 225        INIT_LIST_HEAD(&rbdc->node);
 226
 227        rbdc->client = ceph_create_client(opt, rbdc);
 228        if (IS_ERR(rbdc->client))
 229                goto out_rbdc;
 230        opt = NULL; /* Now rbdc->client is responsible for opt */
 231
 232        ret = ceph_open_session(rbdc->client);
 233        if (ret < 0)
 234                goto out_err;
 235
 236        spin_lock(&node_lock);
 237        list_add_tail(&rbdc->node, &rbd_client_list);
 238        spin_unlock(&node_lock);
 239
 240        dout("rbd_client_create created %p\n", rbdc);
 241        return rbdc;
 242
 243out_err:
 244        ceph_destroy_client(rbdc->client);
 245out_rbdc:
 246        kfree(rbdc);
 247out_opt:
 248        if (opt)
 249                ceph_destroy_options(opt);
 250        return ERR_PTR(ret);
 251}
 252
 253/*
 254 * Find a ceph client with specific addr and configuration.
 255 */
 256static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 257{
 258        struct rbd_client *client_node;
 259
 260        if (opt->flags & CEPH_OPT_NOSHARE)
 261                return NULL;
 262
 263        list_for_each_entry(client_node, &rbd_client_list, node)
 264                if (ceph_compare_options(opt, client_node->client) == 0)
 265                        return client_node;
 266        return NULL;
 267}
 268
 269/*
 270 * Get a ceph client with specific addr and configuration, if one does
 271 * not exist create it.
 272 */
 273static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
 274                          char *options)
 275{
 276        struct rbd_client *rbdc;
 277        struct ceph_options *opt;
 278        int ret;
 279
 280        ret = ceph_parse_options(&opt, options, mon_addr,
 281                                 mon_addr + strlen(mon_addr), NULL, NULL);
 282        if (ret < 0)
 283                return ret;
 284
 285        spin_lock(&node_lock);
 286        rbdc = __rbd_client_find(opt);
 287        if (rbdc) {
 288                ceph_destroy_options(opt);
 289
 290                /* using an existing client */
 291                kref_get(&rbdc->kref);
 292                rbd_dev->rbd_client = rbdc;
 293                rbd_dev->client = rbdc->client;
 294                spin_unlock(&node_lock);
 295                return 0;
 296        }
 297        spin_unlock(&node_lock);
 298
 299        rbdc = rbd_client_create(opt);
 300        if (IS_ERR(rbdc))
 301                return PTR_ERR(rbdc);
 302
 303        rbd_dev->rbd_client = rbdc;
 304        rbd_dev->client = rbdc->client;
 305        return 0;
 306}
 307
 308/*
 309 * Destroy ceph client
 310 */
 311static void rbd_client_release(struct kref *kref)
 312{
 313        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 314
 315        dout("rbd_release_client %p\n", rbdc);
 316        spin_lock(&node_lock);
 317        list_del(&rbdc->node);
 318        spin_unlock(&node_lock);
 319
 320        ceph_destroy_client(rbdc->client);
 321        kfree(rbdc);
 322}
 323
 324/*
 325 * Drop reference to ceph client node. If it's not referenced anymore, release
 326 * it.
 327 */
 328static void rbd_put_client(struct rbd_device *rbd_dev)
 329{
 330        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 331        rbd_dev->rbd_client = NULL;
 332        rbd_dev->client = NULL;
 333}
 334
 335
 336/*
 337 * Create a new header structure, translate header format from the on-disk
 338 * header.
 339 */
 340static int rbd_header_from_disk(struct rbd_image_header *header,
 341                                 struct rbd_image_header_ondisk *ondisk,
 342                                 int allocated_snaps,
 343                                 gfp_t gfp_flags)
 344{
 345        int i;
 346        u32 snap_count = le32_to_cpu(ondisk->snap_count);
 347        int ret = -ENOMEM;
 348
 349        init_rwsem(&header->snap_rwsem);
 350        header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 351        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 352                                snap_count *
 353                                 sizeof(struct rbd_image_snap_ondisk),
 354                                gfp_flags);
 355        if (!header->snapc)
 356                return -ENOMEM;
 357        if (snap_count) {
 358                header->snap_names = kmalloc(header->snap_names_len,
 359                                             GFP_KERNEL);
 360                if (!header->snap_names)
 361                        goto err_snapc;
 362                header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 363                                             GFP_KERNEL);
 364                if (!header->snap_sizes)
 365                        goto err_names;
 366        } else {
 367                header->snap_names = NULL;
 368                header->snap_sizes = NULL;
 369        }
 370        memcpy(header->block_name, ondisk->block_name,
 371               sizeof(ondisk->block_name));
 372
 373        header->image_size = le64_to_cpu(ondisk->image_size);
 374        header->obj_order = ondisk->options.order;
 375        header->crypt_type = ondisk->options.crypt_type;
 376        header->comp_type = ondisk->options.comp_type;
 377
 378        atomic_set(&header->snapc->nref, 1);
 379        header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 380        header->snapc->num_snaps = snap_count;
 381        header->total_snaps = snap_count;
 382
 383        if (snap_count &&
 384            allocated_snaps == snap_count) {
 385                for (i = 0; i < snap_count; i++) {
 386                        header->snapc->snaps[i] =
 387                                le64_to_cpu(ondisk->snaps[i].id);
 388                        header->snap_sizes[i] =
 389                                le64_to_cpu(ondisk->snaps[i].image_size);
 390                }
 391
 392                /* copy snapshot names */
 393                memcpy(header->snap_names, &ondisk->snaps[i],
 394                        header->snap_names_len);
 395        }
 396
 397        return 0;
 398
 399err_names:
 400        kfree(header->snap_names);
 401err_snapc:
 402        kfree(header->snapc);
 403        return ret;
 404}
 405
 406static int snap_index(struct rbd_image_header *header, int snap_num)
 407{
 408        return header->total_snaps - snap_num;
 409}
 410
 411static u64 cur_snap_id(struct rbd_device *rbd_dev)
 412{
 413        struct rbd_image_header *header = &rbd_dev->header;
 414
 415        if (!rbd_dev->cur_snap)
 416                return 0;
 417
 418        return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
 419}
 420
 421static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 422                        u64 *seq, u64 *size)
 423{
 424        int i;
 425        char *p = header->snap_names;
 426
 427        for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
 428                if (strcmp(snap_name, p) == 0)
 429                        break;
 430        }
 431        if (i == header->total_snaps)
 432                return -ENOENT;
 433        if (seq)
 434                *seq = header->snapc->snaps[i];
 435
 436        if (size)
 437                *size = header->snap_sizes[i];
 438
 439        return i;
 440}
 441
 442static int rbd_header_set_snap(struct rbd_device *dev,
 443                               const char *snap_name,
 444                               u64 *size)
 445{
 446        struct rbd_image_header *header = &dev->header;
 447        struct ceph_snap_context *snapc = header->snapc;
 448        int ret = -ENOENT;
 449
 450        down_write(&header->snap_rwsem);
 451
 452        if (!snap_name ||
 453            !*snap_name ||
 454            strcmp(snap_name, "-") == 0 ||
 455            strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
 456                if (header->total_snaps)
 457                        snapc->seq = header->snap_seq;
 458                else
 459                        snapc->seq = 0;
 460                dev->cur_snap = 0;
 461                dev->read_only = 0;
 462                if (size)
 463                        *size = header->image_size;
 464        } else {
 465                ret = snap_by_name(header, snap_name, &snapc->seq, size);
 466                if (ret < 0)
 467                        goto done;
 468
 469                dev->cur_snap = header->total_snaps - ret;
 470                dev->read_only = 1;
 471        }
 472
 473        ret = 0;
 474done:
 475        up_write(&header->snap_rwsem);
 476        return ret;
 477}
 478
 479static void rbd_header_free(struct rbd_image_header *header)
 480{
 481        kfree(header->snapc);
 482        kfree(header->snap_names);
 483        kfree(header->snap_sizes);
 484}
 485
 486/*
 487 * get the actual striped segment name, offset and length
 488 */
 489static u64 rbd_get_segment(struct rbd_image_header *header,
 490                           const char *block_name,
 491                           u64 ofs, u64 len,
 492                           char *seg_name, u64 *segofs)
 493{
 494        u64 seg = ofs >> header->obj_order;
 495
 496        if (seg_name)
 497                snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 498                         "%s.%012llx", block_name, seg);
 499
 500        ofs = ofs & ((1 << header->obj_order) - 1);
 501        len = min_t(u64, len, (1 << header->obj_order) - ofs);
 502
 503        if (segofs)
 504                *segofs = ofs;
 505
 506        return len;
 507}
 508
 509/*
 510 * bio helpers
 511 */
 512
 513static void bio_chain_put(struct bio *chain)
 514{
 515        struct bio *tmp;
 516
 517        while (chain) {
 518                tmp = chain;
 519                chain = chain->bi_next;
 520                bio_put(tmp);
 521        }
 522}
 523
 524/*
 525 * zeros a bio chain, starting at specific offset
 526 */
 527static void zero_bio_chain(struct bio *chain, int start_ofs)
 528{
 529        struct bio_vec *bv;
 530        unsigned long flags;
 531        void *buf;
 532        int i;
 533        int pos = 0;
 534
 535        while (chain) {
 536                bio_for_each_segment(bv, chain, i) {
 537                        if (pos + bv->bv_len > start_ofs) {
 538                                int remainder = max(start_ofs - pos, 0);
 539                                buf = bvec_kmap_irq(bv, &flags);
 540                                memset(buf + remainder, 0,
 541                                       bv->bv_len - remainder);
 542                                bvec_kunmap_irq(buf, &flags);
 543                        }
 544                        pos += bv->bv_len;
 545                }
 546
 547                chain = chain->bi_next;
 548        }
 549}
 550
 551/*
 552 * bio_chain_clone - clone a chain of bios up to a certain length.
 553 * might return a bio_pair that will need to be released.
 554 */
 555static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 556                                   struct bio_pair **bp,
 557                                   int len, gfp_t gfpmask)
 558{
 559        struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 560        int total = 0;
 561
 562        if (*bp) {
 563                bio_pair_release(*bp);
 564                *bp = NULL;
 565        }
 566
 567        while (old_chain && (total < len)) {
 568                tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 569                if (!tmp)
 570                        goto err_out;
 571
 572                if (total + old_chain->bi_size > len) {
 573                        struct bio_pair *bp;
 574
 575                        /*
 576                         * this split can only happen with a single paged bio,
 577                         * split_bio will BUG_ON if this is not the case
 578                         */
 579                        dout("bio_chain_clone split! total=%d remaining=%d"
 580                             "bi_size=%d\n",
 581                             (int)total, (int)len-total,
 582                             (int)old_chain->bi_size);
 583
 584                        /* split the bio. We'll release it either in the next
 585                           call, or it will have to be released outside */
 586                        bp = bio_split(old_chain, (len - total) / 512ULL);
 587                        if (!bp)
 588                                goto err_out;
 589
 590                        __bio_clone(tmp, &bp->bio1);
 591
 592                        *next = &bp->bio2;
 593                } else {
 594                        __bio_clone(tmp, old_chain);
 595                        *next = old_chain->bi_next;
 596                }
 597
 598                tmp->bi_bdev = NULL;
 599                gfpmask &= ~__GFP_WAIT;
 600                tmp->bi_next = NULL;
 601
 602                if (!new_chain) {
 603                        new_chain = tail = tmp;
 604                } else {
 605                        tail->bi_next = tmp;
 606                        tail = tmp;
 607                }
 608                old_chain = old_chain->bi_next;
 609
 610                total += tmp->bi_size;
 611        }
 612
 613        BUG_ON(total < len);
 614
 615        if (tail)
 616                tail->bi_next = NULL;
 617
 618        *old = old_chain;
 619
 620        return new_chain;
 621
 622err_out:
 623        dout("bio_chain_clone with err\n");
 624        bio_chain_put(new_chain);
 625        return NULL;
 626}
 627
 628/*
 629 * helpers for osd request op vectors.
 630 */
 631static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 632                            int num_ops,
 633                            int opcode,
 634                            u32 payload_len)
 635{
 636        *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 637                       GFP_NOIO);
 638        if (!*ops)
 639                return -ENOMEM;
 640        (*ops)[0].op = opcode;
 641        /*
 642         * op extent offset and length will be set later on
 643         * in calc_raw_layout()
 644         */
 645        (*ops)[0].payload_len = payload_len;
 646        return 0;
 647}
 648
 649static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 650{
 651        kfree(ops);
 652}
 653
 654/*
 655 * Send ceph osd request
 656 */
 657static int rbd_do_request(struct request *rq,
 658                          struct rbd_device *dev,
 659                          struct ceph_snap_context *snapc,
 660                          u64 snapid,
 661                          const char *obj, u64 ofs, u64 len,
 662                          struct bio *bio,
 663                          struct page **pages,
 664                          int num_pages,
 665                          int flags,
 666                          struct ceph_osd_req_op *ops,
 667                          int num_reply,
 668                          void (*rbd_cb)(struct ceph_osd_request *req,
 669                                         struct ceph_msg *msg))
 670{
 671        struct ceph_osd_request *req;
 672        struct ceph_file_layout *layout;
 673        int ret;
 674        u64 bno;
 675        struct timespec mtime = CURRENT_TIME;
 676        struct rbd_request *req_data;
 677        struct ceph_osd_request_head *reqhead;
 678        struct rbd_image_header *header = &dev->header;
 679
 680        ret = -ENOMEM;
 681        req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 682        if (!req_data)
 683                goto done;
 684
 685        dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
 686
 687        down_read(&header->snap_rwsem);
 688
 689        req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
 690                                      snapc,
 691                                      ops,
 692                                      false,
 693                                      GFP_NOIO, pages, bio);
 694        if (IS_ERR(req)) {
 695                up_read(&header->snap_rwsem);
 696                ret = PTR_ERR(req);
 697                goto done_pages;
 698        }
 699
 700        req->r_callback = rbd_cb;
 701
 702        req_data->rq = rq;
 703        req_data->bio = bio;
 704        req_data->pages = pages;
 705        req_data->len = len;
 706
 707        req->r_priv = req_data;
 708
 709        reqhead = req->r_request->front.iov_base;
 710        reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 711
 712        strncpy(req->r_oid, obj, sizeof(req->r_oid));
 713        req->r_oid_len = strlen(req->r_oid);
 714
 715        layout = &req->r_file_layout;
 716        memset(layout, 0, sizeof(*layout));
 717        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 718        layout->fl_stripe_count = cpu_to_le32(1);
 719        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 720        layout->fl_pg_preferred = cpu_to_le32(-1);
 721        layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 722        ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
 723                             ofs, &len, &bno, req, ops);
 724
 725        ceph_osdc_build_request(req, ofs, &len,
 726                                ops,
 727                                snapc,
 728                                &mtime,
 729                                req->r_oid, req->r_oid_len);
 730        up_read(&header->snap_rwsem);
 731
 732        ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
 733        if (ret < 0)
 734                goto done_err;
 735
 736        if (!rbd_cb) {
 737                ret = ceph_osdc_wait_request(&dev->client->osdc, req);
 738                ceph_osdc_put_request(req);
 739        }
 740        return ret;
 741
 742done_err:
 743        bio_chain_put(req_data->bio);
 744        ceph_osdc_put_request(req);
 745done_pages:
 746        kfree(req_data);
 747done:
 748        if (rq)
 749                blk_end_request(rq, ret, len);
 750        return ret;
 751}
 752
 753/*
 754 * Ceph osd op callback
 755 */
 756static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 757{
 758        struct rbd_request *req_data = req->r_priv;
 759        struct ceph_osd_reply_head *replyhead;
 760        struct ceph_osd_op *op;
 761        __s32 rc;
 762        u64 bytes;
 763        int read_op;
 764
 765        /* parse reply */
 766        replyhead = msg->front.iov_base;
 767        WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 768        op = (void *)(replyhead + 1);
 769        rc = le32_to_cpu(replyhead->result);
 770        bytes = le64_to_cpu(op->extent.length);
 771        read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
 772
 773        dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 774
 775        if (rc == -ENOENT && read_op) {
 776                zero_bio_chain(req_data->bio, 0);
 777                rc = 0;
 778        } else if (rc == 0 && read_op && bytes < req_data->len) {
 779                zero_bio_chain(req_data->bio, bytes);
 780                bytes = req_data->len;
 781        }
 782
 783        blk_end_request(req_data->rq, rc, bytes);
 784
 785        if (req_data->bio)
 786                bio_chain_put(req_data->bio);
 787
 788        ceph_osdc_put_request(req);
 789        kfree(req_data);
 790}
 791
 792/*
 793 * Do a synchronous ceph osd operation
 794 */
 795static int rbd_req_sync_op(struct rbd_device *dev,
 796                           struct ceph_snap_context *snapc,
 797                           u64 snapid,
 798                           int opcode,
 799                           int flags,
 800                           struct ceph_osd_req_op *orig_ops,
 801                           int num_reply,
 802                           const char *obj,
 803                           u64 ofs, u64 len,
 804                           char *buf)
 805{
 806        int ret;
 807        struct page **pages;
 808        int num_pages;
 809        struct ceph_osd_req_op *ops = orig_ops;
 810        u32 payload_len;
 811
 812        num_pages = calc_pages_for(ofs , len);
 813        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 814        if (IS_ERR(pages))
 815                return PTR_ERR(pages);
 816
 817        if (!orig_ops) {
 818                payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
 819                ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
 820                if (ret < 0)
 821                        goto done;
 822
 823                if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
 824                        ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
 825                        if (ret < 0)
 826                                goto done_ops;
 827                }
 828        }
 829
 830        ret = rbd_do_request(NULL, dev, snapc, snapid,
 831                          obj, ofs, len, NULL,
 832                          pages, num_pages,
 833                          flags,
 834                          ops,
 835                          2,
 836                          NULL);
 837        if (ret < 0)
 838                goto done_ops;
 839
 840        if ((flags & CEPH_OSD_FLAG_READ) && buf)
 841                ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
 842
 843done_ops:
 844        if (!orig_ops)
 845                rbd_destroy_ops(ops);
 846done:
 847        ceph_release_page_vector(pages, num_pages);
 848        return ret;
 849}
 850
 851/*
 852 * Do an asynchronous ceph osd operation
 853 */
 854static int rbd_do_op(struct request *rq,
 855                     struct rbd_device *rbd_dev ,
 856                     struct ceph_snap_context *snapc,
 857                     u64 snapid,
 858                     int opcode, int flags, int num_reply,
 859                     u64 ofs, u64 len,
 860                     struct bio *bio)
 861{
 862        char *seg_name;
 863        u64 seg_ofs;
 864        u64 seg_len;
 865        int ret;
 866        struct ceph_osd_req_op *ops;
 867        u32 payload_len;
 868
 869        seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
 870        if (!seg_name)
 871                return -ENOMEM;
 872
 873        seg_len = rbd_get_segment(&rbd_dev->header,
 874                                  rbd_dev->header.block_name,
 875                                  ofs, len,
 876                                  seg_name, &seg_ofs);
 877
 878        payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
 879
 880        ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
 881        if (ret < 0)
 882                goto done;
 883
 884        /* we've taken care of segment sizes earlier when we
 885           cloned the bios. We should never have a segment
 886           truncated at this point */
 887        BUG_ON(seg_len < len);
 888
 889        ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
 890                             seg_name, seg_ofs, seg_len,
 891                             bio,
 892                             NULL, 0,
 893                             flags,
 894                             ops,
 895                             num_reply,
 896                             rbd_req_cb);
 897done:
 898        kfree(seg_name);
 899        return ret;
 900}
 901
 902/*
 903 * Request async osd write
 904 */
 905static int rbd_req_write(struct request *rq,
 906                         struct rbd_device *rbd_dev,
 907                         struct ceph_snap_context *snapc,
 908                         u64 ofs, u64 len,
 909                         struct bio *bio)
 910{
 911        return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
 912                         CEPH_OSD_OP_WRITE,
 913                         CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 914                         2,
 915                         ofs, len, bio);
 916}
 917
 918/*
 919 * Request async osd read
 920 */
 921static int rbd_req_read(struct request *rq,
 922                         struct rbd_device *rbd_dev,
 923                         u64 snapid,
 924                         u64 ofs, u64 len,
 925                         struct bio *bio)
 926{
 927        return rbd_do_op(rq, rbd_dev, NULL,
 928                         (snapid ? snapid : CEPH_NOSNAP),
 929                         CEPH_OSD_OP_READ,
 930                         CEPH_OSD_FLAG_READ,
 931                         2,
 932                         ofs, len, bio);
 933}
 934
 935/*
 936 * Request sync osd read
 937 */
 938static int rbd_req_sync_read(struct rbd_device *dev,
 939                          struct ceph_snap_context *snapc,
 940                          u64 snapid,
 941                          const char *obj,
 942                          u64 ofs, u64 len,
 943                          char *buf)
 944{
 945        return rbd_req_sync_op(dev, NULL,
 946                               (snapid ? snapid : CEPH_NOSNAP),
 947                               CEPH_OSD_OP_READ,
 948                               CEPH_OSD_FLAG_READ,
 949                               NULL,
 950                               1, obj, ofs, len, buf);
 951}
 952
 953/*
 954 * Request sync osd read
 955 */
 956static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
 957                                     u64 snapid,
 958                                     const char *obj)
 959{
 960        struct ceph_osd_req_op *ops;
 961        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
 962        if (ret < 0)
 963                return ret;
 964
 965        ops[0].snap.snapid = snapid;
 966
 967        ret = rbd_req_sync_op(dev, NULL,
 968                               CEPH_NOSNAP,
 969                               0,
 970                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 971                               ops,
 972                               1, obj, 0, 0, NULL);
 973
 974        rbd_destroy_ops(ops);
 975
 976        if (ret < 0)
 977                return ret;
 978
 979        return ret;
 980}
 981
 982/*
 983 * Request sync osd read
 984 */
 985static int rbd_req_sync_exec(struct rbd_device *dev,
 986                             const char *obj,
 987                             const char *cls,
 988                             const char *method,
 989                             const char *data,
 990                             int len)
 991{
 992        struct ceph_osd_req_op *ops;
 993        int cls_len = strlen(cls);
 994        int method_len = strlen(method);
 995        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
 996                                    cls_len + method_len + len);
 997        if (ret < 0)
 998                return ret;
 999
1000        ops[0].cls.class_name = cls;
1001        ops[0].cls.class_len = (__u8)cls_len;
1002        ops[0].cls.method_name = method;
1003        ops[0].cls.method_len = (__u8)method_len;
1004        ops[0].cls.argc = 0;
1005        ops[0].cls.indata = data;
1006        ops[0].cls.indata_len = len;
1007
1008        ret = rbd_req_sync_op(dev, NULL,
1009                               CEPH_NOSNAP,
1010                               0,
1011                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012                               ops,
1013                               1, obj, 0, 0, NULL);
1014
1015        rbd_destroy_ops(ops);
1016
1017        dout("cls_exec returned %d\n", ret);
1018        return ret;
1019}
1020
1021/*
1022 * block device queue callback
1023 */
1024static void rbd_rq_fn(struct request_queue *q)
1025{
1026        struct rbd_device *rbd_dev = q->queuedata;
1027        struct request *rq;
1028        struct bio_pair *bp = NULL;
1029
1030        rq = blk_fetch_request(q);
1031
1032        while (1) {
1033                struct bio *bio;
1034                struct bio *rq_bio, *next_bio = NULL;
1035                bool do_write;
1036                int size, op_size = 0;
1037                u64 ofs;
1038
1039                /* peek at request from block layer */
1040                if (!rq)
1041                        break;
1042
1043                dout("fetched request\n");
1044
1045                /* filter out block requests we don't understand */
1046                if ((rq->cmd_type != REQ_TYPE_FS)) {
1047                        __blk_end_request_all(rq, 0);
1048                        goto next;
1049                }
1050
1051                /* deduce our operation (read, write) */
1052                do_write = (rq_data_dir(rq) == WRITE);
1053
1054                size = blk_rq_bytes(rq);
1055                ofs = blk_rq_pos(rq) * 512ULL;
1056                rq_bio = rq->bio;
1057                if (do_write && rbd_dev->read_only) {
1058                        __blk_end_request_all(rq, -EROFS);
1059                        goto next;
1060                }
1061
1062                spin_unlock_irq(q->queue_lock);
1063
1064                dout("%s 0x%x bytes at 0x%llx\n",
1065                     do_write ? "write" : "read",
1066                     size, blk_rq_pos(rq) * 512ULL);
1067
1068                do {
1069                        /* a bio clone to be passed down to OSD req */
1070                        dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1071                        op_size = rbd_get_segment(&rbd_dev->header,
1072                                                  rbd_dev->header.block_name,
1073                                                  ofs, size,
1074                                                  NULL, NULL);
1075                        bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1076                                              op_size, GFP_ATOMIC);
1077                        if (!bio) {
1078                                spin_lock_irq(q->queue_lock);
1079                                __blk_end_request_all(rq, -ENOMEM);
1080                                goto next;
1081                        }
1082
1083                        /* init OSD command: write or read */
1084                        if (do_write)
1085                                rbd_req_write(rq, rbd_dev,
1086                                              rbd_dev->header.snapc,
1087                                              ofs,
1088                                              op_size, bio);
1089                        else
1090                                rbd_req_read(rq, rbd_dev,
1091                                             cur_snap_id(rbd_dev),
1092                                             ofs,
1093                                             op_size, bio);
1094
1095                        size -= op_size;
1096                        ofs += op_size;
1097
1098                        rq_bio = next_bio;
1099                } while (size > 0);
1100
1101                if (bp)
1102                        bio_pair_release(bp);
1103
1104                spin_lock_irq(q->queue_lock);
1105next:
1106                rq = blk_fetch_request(q);
1107        }
1108}
1109
1110/*
1111 * a queue callback. Makes sure that we don't create a bio that spans across
1112 * multiple osd objects. One exception would be with a single page bios,
1113 * which we handle later at bio_chain_clone
1114 */
1115static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1116                          struct bio_vec *bvec)
1117{
1118        struct rbd_device *rbd_dev = q->queuedata;
1119        unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1120        sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1121        unsigned int bio_sectors = bmd->bi_size >> 9;
1122        int max;
1123
1124        max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1125                                 + bio_sectors)) << 9;
1126        if (max < 0)
1127                max = 0; /* bio_add cannot handle a negative return */
1128        if (max <= bvec->bv_len && bio_sectors == 0)
1129                return bvec->bv_len;
1130        return max;
1131}
1132
1133static void rbd_free_disk(struct rbd_device *rbd_dev)
1134{
1135        struct gendisk *disk = rbd_dev->disk;
1136
1137        if (!disk)
1138                return;
1139
1140        rbd_header_free(&rbd_dev->header);
1141
1142        if (disk->flags & GENHD_FL_UP)
1143                del_gendisk(disk);
1144        if (disk->queue)
1145                blk_cleanup_queue(disk->queue);
1146        put_disk(disk);
1147}
1148
1149/*
1150 * reload the ondisk the header 
1151 */
1152static int rbd_read_header(struct rbd_device *rbd_dev,
1153                           struct rbd_image_header *header)
1154{
1155        ssize_t rc;
1156        struct rbd_image_header_ondisk *dh;
1157        int snap_count = 0;
1158        u64 snap_names_len = 0;
1159
1160        while (1) {
1161                int len = sizeof(*dh) +
1162                          snap_count * sizeof(struct rbd_image_snap_ondisk) +
1163                          snap_names_len;
1164
1165                rc = -ENOMEM;
1166                dh = kmalloc(len, GFP_KERNEL);
1167                if (!dh)
1168                        return -ENOMEM;
1169
1170                rc = rbd_req_sync_read(rbd_dev,
1171                                       NULL, CEPH_NOSNAP,
1172                                       rbd_dev->obj_md_name,
1173                                       0, len,
1174                                       (char *)dh);
1175                if (rc < 0)
1176                        goto out_dh;
1177
1178                rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1179                if (rc < 0)
1180                        goto out_dh;
1181
1182                if (snap_count != header->total_snaps) {
1183                        snap_count = header->total_snaps;
1184                        snap_names_len = header->snap_names_len;
1185                        rbd_header_free(header);
1186                        kfree(dh);
1187                        continue;
1188                }
1189                break;
1190        }
1191
1192out_dh:
1193        kfree(dh);
1194        return rc;
1195}
1196
1197/*
1198 * create a snapshot
1199 */
1200static int rbd_header_add_snap(struct rbd_device *dev,
1201                               const char *snap_name,
1202                               gfp_t gfp_flags)
1203{
1204        int name_len = strlen(snap_name);
1205        u64 new_snapid;
1206        int ret;
1207        void *data, *data_start, *data_end;
1208
1209        /* we should create a snapshot only if we're pointing at the head */
1210        if (dev->cur_snap)
1211                return -EINVAL;
1212
1213        ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1214                                      &new_snapid);
1215        dout("created snapid=%lld\n", new_snapid);
1216        if (ret < 0)
1217                return ret;
1218
1219        data = kmalloc(name_len + 16, gfp_flags);
1220        if (!data)
1221                return -ENOMEM;
1222
1223        data_start = data;
1224        data_end = data + name_len + 16;
1225
1226        ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1227        ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1228
1229        ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230                                data_start, data - data_start);
1231
1232        kfree(data_start);
1233
1234        if (ret < 0)
1235                return ret;
1236
1237        dev->header.snapc->seq =  new_snapid;
1238
1239        return 0;
1240bad:
1241        return -ERANGE;
1242}
1243
1244static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1245{
1246        struct rbd_snap *snap;
1247
1248        while (!list_empty(&rbd_dev->snaps)) {
1249                snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1250                __rbd_remove_snap_dev(rbd_dev, snap);
1251        }
1252}
1253
1254/*
1255 * only read the first part of the ondisk header, without the snaps info
1256 */
1257static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1258{
1259        int ret;
1260        struct rbd_image_header h;
1261        u64 snap_seq;
1262
1263        ret = rbd_read_header(rbd_dev, &h);
1264        if (ret < 0)
1265                return ret;
1266
1267        down_write(&rbd_dev->header.snap_rwsem);
1268
1269        snap_seq = rbd_dev->header.snapc->seq;
1270
1271        kfree(rbd_dev->header.snapc);
1272        kfree(rbd_dev->header.snap_names);
1273        kfree(rbd_dev->header.snap_sizes);
1274
1275        rbd_dev->header.total_snaps = h.total_snaps;
1276        rbd_dev->header.snapc = h.snapc;
1277        rbd_dev->header.snap_names = h.snap_names;
1278        rbd_dev->header.snap_names_len = h.snap_names_len;
1279        rbd_dev->header.snap_sizes = h.snap_sizes;
1280        rbd_dev->header.snapc->seq = snap_seq;
1281
1282        ret = __rbd_init_snaps_header(rbd_dev);
1283
1284        up_write(&rbd_dev->header.snap_rwsem);
1285
1286        return ret;
1287}
1288
1289static int rbd_init_disk(struct rbd_device *rbd_dev)
1290{
1291        struct gendisk *disk;
1292        struct request_queue *q;
1293        int rc;
1294        u64 total_size = 0;
1295
1296        /* contact OSD, request size info about the object being mapped */
1297        rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1298        if (rc)
1299                return rc;
1300
1301        /* no need to lock here, as rbd_dev is not registered yet */
1302        rc = __rbd_init_snaps_header(rbd_dev);
1303        if (rc)
1304                return rc;
1305
1306        rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1307        if (rc)
1308                return rc;
1309
1310        /* create gendisk info */
1311        rc = -ENOMEM;
1312        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1313        if (!disk)
1314                goto out;
1315
1316        sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1317        disk->major = rbd_dev->major;
1318        disk->first_minor = 0;
1319        disk->fops = &rbd_bd_ops;
1320        disk->private_data = rbd_dev;
1321
1322        /* init rq */
1323        rc = -ENOMEM;
1324        q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1325        if (!q)
1326                goto out_disk;
1327        blk_queue_merge_bvec(q, rbd_merge_bvec);
1328        disk->queue = q;
1329
1330        q->queuedata = rbd_dev;
1331
1332        rbd_dev->disk = disk;
1333        rbd_dev->q = q;
1334
1335        /* finally, announce the disk to the world */
1336        set_capacity(disk, total_size / 512ULL);
1337        add_disk(disk);
1338
1339        pr_info("%s: added with size 0x%llx\n",
1340                disk->disk_name, (unsigned long long)total_size);
1341        return 0;
1342
1343out_disk:
1344        put_disk(disk);
1345out:
1346        return rc;
1347}
1348
1349/*
1350  sysfs
1351*/
1352
1353static ssize_t rbd_size_show(struct device *dev,
1354                             struct device_attribute *attr, char *buf)
1355{
1356        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1357
1358        return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1359}
1360
1361static ssize_t rbd_major_show(struct device *dev,
1362                              struct device_attribute *attr, char *buf)
1363{
1364        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1365
1366        return sprintf(buf, "%d\n", rbd_dev->major);
1367}
1368
1369static ssize_t rbd_client_id_show(struct device *dev,
1370                                  struct device_attribute *attr, char *buf)
1371{
1372        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1373
1374        return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1375}
1376
1377static ssize_t rbd_pool_show(struct device *dev,
1378                             struct device_attribute *attr, char *buf)
1379{
1380        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1381
1382        return sprintf(buf, "%s\n", rbd_dev->pool_name);
1383}
1384
1385static ssize_t rbd_name_show(struct device *dev,
1386                             struct device_attribute *attr, char *buf)
1387{
1388        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1389
1390        return sprintf(buf, "%s\n", rbd_dev->obj);
1391}
1392
1393static ssize_t rbd_snap_show(struct device *dev,
1394                             struct device_attribute *attr,
1395                             char *buf)
1396{
1397        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1398
1399        return sprintf(buf, "%s\n", rbd_dev->snap_name);
1400}
1401
1402static ssize_t rbd_image_refresh(struct device *dev,
1403                                 struct device_attribute *attr,
1404                                 const char *buf,
1405                                 size_t size)
1406{
1407        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1408        int rc;
1409        int ret = size;
1410
1411        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1412
1413        rc = __rbd_update_snaps(rbd_dev);
1414        if (rc < 0)
1415                ret = rc;
1416
1417        mutex_unlock(&ctl_mutex);
1418        return ret;
1419}
1420
1421static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1422static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1423static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1424static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1425static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1426static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1427static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1428static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1429static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1430
1431static struct attribute *rbd_attrs[] = {
1432        &dev_attr_size.attr,
1433        &dev_attr_major.attr,
1434        &dev_attr_client_id.attr,
1435        &dev_attr_pool.attr,
1436        &dev_attr_name.attr,
1437        &dev_attr_current_snap.attr,
1438        &dev_attr_refresh.attr,
1439        &dev_attr_create_snap.attr,
1440        &dev_attr_rollback_snap.attr,
1441        NULL
1442};
1443
1444static struct attribute_group rbd_attr_group = {
1445        .attrs = rbd_attrs,
1446};
1447
1448static const struct attribute_group *rbd_attr_groups[] = {
1449        &rbd_attr_group,
1450        NULL
1451};
1452
1453static void rbd_sysfs_dev_release(struct device *dev)
1454{
1455}
1456
1457static struct device_type rbd_device_type = {
1458        .name           = "rbd",
1459        .groups         = rbd_attr_groups,
1460        .release        = rbd_sysfs_dev_release,
1461};
1462
1463
1464/*
1465  sysfs - snapshots
1466*/
1467
1468static ssize_t rbd_snap_size_show(struct device *dev,
1469                                  struct device_attribute *attr,
1470                                  char *buf)
1471{
1472        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1473
1474        return sprintf(buf, "%lld\n", (long long)snap->size);
1475}
1476
1477static ssize_t rbd_snap_id_show(struct device *dev,
1478                                struct device_attribute *attr,
1479                                char *buf)
1480{
1481        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1482
1483        return sprintf(buf, "%lld\n", (long long)snap->id);
1484}
1485
1486static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1487static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1488
1489static struct attribute *rbd_snap_attrs[] = {
1490        &dev_attr_snap_size.attr,
1491        &dev_attr_snap_id.attr,
1492        NULL,
1493};
1494
1495static struct attribute_group rbd_snap_attr_group = {
1496        .attrs = rbd_snap_attrs,
1497};
1498
1499static void rbd_snap_dev_release(struct device *dev)
1500{
1501        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1502        kfree(snap->name);
1503        kfree(snap);
1504}
1505
1506static const struct attribute_group *rbd_snap_attr_groups[] = {
1507        &rbd_snap_attr_group,
1508        NULL
1509};
1510
1511static struct device_type rbd_snap_device_type = {
1512        .groups         = rbd_snap_attr_groups,
1513        .release        = rbd_snap_dev_release,
1514};
1515
1516static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1517                                  struct rbd_snap *snap)
1518{
1519        list_del(&snap->node);
1520        device_unregister(&snap->dev);
1521}
1522
1523static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1524                                  struct rbd_snap *snap,
1525                                  struct device *parent)
1526{
1527        struct device *dev = &snap->dev;
1528        int ret;
1529
1530        dev->type = &rbd_snap_device_type;
1531        dev->parent = parent;
1532        dev->release = rbd_snap_dev_release;
1533        dev_set_name(dev, "snap_%s", snap->name);
1534        ret = device_register(dev);
1535
1536        return ret;
1537}
1538
1539static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1540                              int i, const char *name,
1541                              struct rbd_snap **snapp)
1542{
1543        int ret;
1544        struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1545        if (!snap)
1546                return -ENOMEM;
1547        snap->name = kstrdup(name, GFP_KERNEL);
1548        snap->size = rbd_dev->header.snap_sizes[i];
1549        snap->id = rbd_dev->header.snapc->snaps[i];
1550        if (device_is_registered(&rbd_dev->dev)) {
1551                ret = rbd_register_snap_dev(rbd_dev, snap,
1552                                             &rbd_dev->dev);
1553                if (ret < 0)
1554                        goto err;
1555        }
1556        *snapp = snap;
1557        return 0;
1558err:
1559        kfree(snap->name);
1560        kfree(snap);
1561        return ret;
1562}
1563
1564/*
1565 * search for the previous snap in a null delimited string list
1566 */
1567const char *rbd_prev_snap_name(const char *name, const char *start)
1568{
1569        if (name < start + 2)
1570                return NULL;
1571
1572        name -= 2;
1573        while (*name) {
1574                if (name == start)
1575                        return start;
1576                name--;
1577        }
1578        return name + 1;
1579}
1580
1581/*
1582 * compare the old list of snapshots that we have to what's in the header
1583 * and update it accordingly. Note that the header holds the snapshots
1584 * in a reverse order (from newest to oldest) and we need to go from
1585 * older to new so that we don't get a duplicate snap name when
1586 * doing the process (e.g., removed snapshot and recreated a new
1587 * one with the same name.
1588 */
1589static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1590{
1591        const char *name, *first_name;
1592        int i = rbd_dev->header.total_snaps;
1593        struct rbd_snap *snap, *old_snap = NULL;
1594        int ret;
1595        struct list_head *p, *n;
1596
1597        first_name = rbd_dev->header.snap_names;
1598        name = first_name + rbd_dev->header.snap_names_len;
1599
1600        list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1601                u64 cur_id;
1602
1603                old_snap = list_entry(p, struct rbd_snap, node);
1604
1605                if (i)
1606                        cur_id = rbd_dev->header.snapc->snaps[i - 1];
1607
1608                if (!i || old_snap->id < cur_id) {
1609                        /* old_snap->id was skipped, thus was removed */
1610                        __rbd_remove_snap_dev(rbd_dev, old_snap);
1611                        continue;
1612                }
1613                if (old_snap->id == cur_id) {
1614                        /* we have this snapshot already */
1615                        i--;
1616                        name = rbd_prev_snap_name(name, first_name);
1617                        continue;
1618                }
1619                for (; i > 0;
1620                     i--, name = rbd_prev_snap_name(name, first_name)) {
1621                        if (!name) {
1622                                WARN_ON(1);
1623                                return -EINVAL;
1624                        }
1625                        cur_id = rbd_dev->header.snapc->snaps[i];
1626                        /* snapshot removal? handle it above */
1627                        if (cur_id >= old_snap->id)
1628                                break;
1629                        /* a new snapshot */
1630                        ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1631                        if (ret < 0)
1632                                return ret;
1633
1634                        /* note that we add it backward so using n and not p */
1635                        list_add(&snap->node, n);
1636                        p = &snap->node;
1637                }
1638        }
1639        /* we're done going over the old snap list, just add what's left */
1640        for (; i > 0; i--) {
1641                name = rbd_prev_snap_name(name, first_name);
1642                if (!name) {
1643                        WARN_ON(1);
1644                        return -EINVAL;
1645                }
1646                ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1647                if (ret < 0)
1648                        return ret;
1649                list_add(&snap->node, &rbd_dev->snaps);
1650        }
1651
1652        return 0;
1653}
1654
1655
1656static void rbd_root_dev_release(struct device *dev)
1657{
1658}
1659
1660static struct device rbd_root_dev = {
1661        .init_name =    "rbd",
1662        .release =      rbd_root_dev_release,
1663};
1664
1665static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1666{
1667        int ret = -ENOMEM;
1668        struct device *dev;
1669        struct rbd_snap *snap;
1670
1671        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1672        dev = &rbd_dev->dev;
1673
1674        dev->bus = &rbd_bus_type;
1675        dev->type = &rbd_device_type;
1676        dev->parent = &rbd_root_dev;
1677        dev->release = rbd_dev_release;
1678        dev_set_name(dev, "%d", rbd_dev->id);
1679        ret = device_register(dev);
1680        if (ret < 0)
1681                goto done_free;
1682
1683        list_for_each_entry(snap, &rbd_dev->snaps, node) {
1684                ret = rbd_register_snap_dev(rbd_dev, snap,
1685                                             &rbd_dev->dev);
1686                if (ret < 0)
1687                        break;
1688        }
1689
1690        mutex_unlock(&ctl_mutex);
1691        return 0;
1692done_free:
1693        mutex_unlock(&ctl_mutex);
1694        return ret;
1695}
1696
1697static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1698{
1699        device_unregister(&rbd_dev->dev);
1700}
1701
1702static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1703{
1704        struct ceph_osd_client *osdc;
1705        struct rbd_device *rbd_dev;
1706        ssize_t rc = -ENOMEM;
1707        int irc, new_id = 0;
1708        struct list_head *tmp;
1709        char *mon_dev_name;
1710        char *options;
1711
1712        if (!try_module_get(THIS_MODULE))
1713                return -ENODEV;
1714
1715        mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1716        if (!mon_dev_name)
1717                goto err_out_mod;
1718
1719        options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1720        if (!options)
1721                goto err_mon_dev;
1722
1723        /* new rbd_device object */
1724        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1725        if (!rbd_dev)
1726                goto err_out_opt;
1727
1728        /* static rbd_device initialization */
1729        spin_lock_init(&rbd_dev->lock);
1730        INIT_LIST_HEAD(&rbd_dev->node);
1731        INIT_LIST_HEAD(&rbd_dev->snaps);
1732
1733        /* generate unique id: find highest unique id, add one */
1734        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1735
1736        list_for_each(tmp, &rbd_dev_list) {
1737                struct rbd_device *rbd_dev;
1738
1739                rbd_dev = list_entry(tmp, struct rbd_device, node);
1740                if (rbd_dev->id >= new_id)
1741                        new_id = rbd_dev->id + 1;
1742        }
1743
1744        rbd_dev->id = new_id;
1745
1746        /* add to global list */
1747        list_add_tail(&rbd_dev->node, &rbd_dev_list);
1748
1749        /* parse add command */
1750        if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1751                   "%" __stringify(RBD_MAX_OPT_LEN) "s "
1752                   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1753                   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1754                   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1755                   mon_dev_name, options, rbd_dev->pool_name,
1756                   rbd_dev->obj, rbd_dev->snap_name) < 4) {
1757                rc = -EINVAL;
1758                goto err_out_slot;
1759        }
1760
1761        if (rbd_dev->snap_name[0] == 0)
1762                rbd_dev->snap_name[0] = '-';
1763
1764        rbd_dev->obj_len = strlen(rbd_dev->obj);
1765        snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1766                 rbd_dev->obj, RBD_SUFFIX);
1767
1768        /* initialize rest of new object */
1769        snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1770        rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1771        if (rc < 0)
1772                goto err_out_slot;
1773
1774        mutex_unlock(&ctl_mutex);
1775
1776        /* pick the pool */
1777        osdc = &rbd_dev->client->osdc;
1778        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1779        if (rc < 0)
1780                goto err_out_client;
1781        rbd_dev->poolid = rc;
1782
1783        /* register our block device */
1784        irc = register_blkdev(0, rbd_dev->name);
1785        if (irc < 0) {
1786                rc = irc;
1787                goto err_out_client;
1788        }
1789        rbd_dev->major = irc;
1790
1791        rc = rbd_bus_add_dev(rbd_dev);
1792        if (rc)
1793                goto err_out_blkdev;
1794
1795        /* set up and announce blkdev mapping */
1796        rc = rbd_init_disk(rbd_dev);
1797        if (rc)
1798                goto err_out_bus;
1799
1800        return count;
1801
1802err_out_bus:
1803        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1804        list_del_init(&rbd_dev->node);
1805        mutex_unlock(&ctl_mutex);
1806
1807        /* this will also clean up rest of rbd_dev stuff */
1808
1809        rbd_bus_del_dev(rbd_dev);
1810        kfree(options);
1811        kfree(mon_dev_name);
1812        return rc;
1813
1814err_out_blkdev:
1815        unregister_blkdev(rbd_dev->major, rbd_dev->name);
1816err_out_client:
1817        rbd_put_client(rbd_dev);
1818        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1819err_out_slot:
1820        list_del_init(&rbd_dev->node);
1821        mutex_unlock(&ctl_mutex);
1822
1823        kfree(rbd_dev);
1824err_out_opt:
1825        kfree(options);
1826err_mon_dev:
1827        kfree(mon_dev_name);
1828err_out_mod:
1829        dout("Error adding device %s\n", buf);
1830        module_put(THIS_MODULE);
1831        return rc;
1832}
1833
1834static struct rbd_device *__rbd_get_dev(unsigned long id)
1835{
1836        struct list_head *tmp;
1837        struct rbd_device *rbd_dev;
1838
1839        list_for_each(tmp, &rbd_dev_list) {
1840                rbd_dev = list_entry(tmp, struct rbd_device, node);
1841                if (rbd_dev->id == id)
1842                        return rbd_dev;
1843        }
1844        return NULL;
1845}
1846
1847static void rbd_dev_release(struct device *dev)
1848{
1849        struct rbd_device *rbd_dev =
1850                        container_of(dev, struct rbd_device, dev);
1851
1852        rbd_put_client(rbd_dev);
1853
1854        /* clean up and free blkdev */
1855        rbd_free_disk(rbd_dev);
1856        unregister_blkdev(rbd_dev->major, rbd_dev->name);
1857        kfree(rbd_dev);
1858
1859        /* release module ref */
1860        module_put(THIS_MODULE);
1861}
1862
1863static ssize_t rbd_remove(struct bus_type *bus,
1864                          const char *buf,
1865                          size_t count)
1866{
1867        struct rbd_device *rbd_dev = NULL;
1868        int target_id, rc;
1869        unsigned long ul;
1870        int ret = count;
1871
1872        rc = strict_strtoul(buf, 10, &ul);
1873        if (rc)
1874                return rc;
1875
1876        /* convert to int; abort if we lost anything in the conversion */
1877        target_id = (int) ul;
1878        if (target_id != ul)
1879                return -EINVAL;
1880
1881        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1882
1883        rbd_dev = __rbd_get_dev(target_id);
1884        if (!rbd_dev) {
1885                ret = -ENOENT;
1886                goto done;
1887        }
1888
1889        list_del_init(&rbd_dev->node);
1890
1891        __rbd_remove_all_snaps(rbd_dev);
1892        rbd_bus_del_dev(rbd_dev);
1893
1894done:
1895        mutex_unlock(&ctl_mutex);
1896        return ret;
1897}
1898
1899static ssize_t rbd_snap_add(struct device *dev,
1900                            struct device_attribute *attr,
1901                            const char *buf,
1902                            size_t count)
1903{
1904        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1905        int ret;
1906        char *name = kmalloc(count + 1, GFP_KERNEL);
1907        if (!name)
1908                return -ENOMEM;
1909
1910        snprintf(name, count, "%s", buf);
1911
1912        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1913
1914        ret = rbd_header_add_snap(rbd_dev,
1915                                  name, GFP_KERNEL);
1916        if (ret < 0)
1917                goto done_unlock;
1918
1919        ret = __rbd_update_snaps(rbd_dev);
1920        if (ret < 0)
1921                goto done_unlock;
1922
1923        ret = count;
1924done_unlock:
1925        mutex_unlock(&ctl_mutex);
1926        kfree(name);
1927        return ret;
1928}
1929
1930static ssize_t rbd_snap_rollback(struct device *dev,
1931                                 struct device_attribute *attr,
1932                                 const char *buf,
1933                                 size_t count)
1934{
1935        struct rbd_device *rbd_dev = dev_to_rbd(dev);
1936        int ret;
1937        u64 snapid;
1938        u64 cur_ofs;
1939        char *seg_name = NULL;
1940        char *snap_name = kmalloc(count + 1, GFP_KERNEL);
1941        ret = -ENOMEM;
1942        if (!snap_name)
1943                return ret;
1944
1945        /* parse snaps add command */
1946        snprintf(snap_name, count, "%s", buf);
1947        seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1948        if (!seg_name)
1949                goto done;
1950
1951        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1952
1953        ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1954        if (ret < 0)
1955                goto done_unlock;
1956
1957        dout("snapid=%lld\n", snapid);
1958
1959        cur_ofs = 0;
1960        while (cur_ofs < rbd_dev->header.image_size) {
1961                cur_ofs += rbd_get_segment(&rbd_dev->header,
1962                                           rbd_dev->obj,
1963                                           cur_ofs, (u64)-1,
1964                                           seg_name, NULL);
1965                dout("seg_name=%s\n", seg_name);
1966
1967                ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1968                if (ret < 0)
1969                        pr_warning("could not roll back obj %s err=%d\n",
1970                                   seg_name, ret);
1971        }
1972
1973        ret = __rbd_update_snaps(rbd_dev);
1974        if (ret < 0)
1975                goto done_unlock;
1976
1977        ret = count;
1978
1979done_unlock:
1980        mutex_unlock(&ctl_mutex);
1981done:
1982        kfree(seg_name);
1983        kfree(snap_name);
1984
1985        return ret;
1986}
1987
1988static struct bus_attribute rbd_bus_attrs[] = {
1989        __ATTR(add, S_IWUSR, NULL, rbd_add),
1990        __ATTR(remove, S_IWUSR, NULL, rbd_remove),
1991        __ATTR_NULL
1992};
1993
1994/*
1995 * create control files in sysfs
1996 * /sys/bus/rbd/...
1997 */
1998static int rbd_sysfs_init(void)
1999{
2000        int ret;
2001
2002        rbd_bus_type.bus_attrs = rbd_bus_attrs;
2003
2004        ret = bus_register(&rbd_bus_type);
2005         if (ret < 0)
2006                return ret;
2007
2008        ret = device_register(&rbd_root_dev);
2009
2010        return ret;
2011}
2012
2013static void rbd_sysfs_cleanup(void)
2014{
2015        device_unregister(&rbd_root_dev);
2016        bus_unregister(&rbd_bus_type);
2017}
2018
2019int __init rbd_init(void)
2020{
2021        int rc;
2022
2023        rc = rbd_sysfs_init();
2024        if (rc)
2025                return rc;
2026        spin_lock_init(&node_lock);
2027        pr_info("loaded " DRV_NAME_LONG "\n");
2028        return 0;
2029}
2030
2031void __exit rbd_exit(void)
2032{
2033        rbd_sysfs_cleanup();
2034}
2035
2036module_init(rbd_init);
2037module_exit(rbd_exit);
2038
2039MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2040MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2041MODULE_DESCRIPTION("rados block device");
2042
2043/* following authorship retained from original osdblk.c */
2044MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2045
2046MODULE_LICENSE("GPL");
2047