1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44
45
46
47
48
49
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56#define RBD_MINORS_PER_MAJOR 256
57
58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59#define RBD_MAX_POOL_NAME_LEN 64
60#define RBD_MAX_SNAP_NAME_LEN 32
61#define RBD_MAX_OPT_LEN 1024
62
63#define RBD_SNAP_HEAD_NAME "-"
64
65
66
67
68
69
70
71#define DEV_NAME_LEN 32
72#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
73
74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75
76
77
78
79struct rbd_image_header {
80 u64 image_size;
81 char block_name[32];
82 __u8 obj_order;
83 __u8 crypt_type;
84 __u8 comp_type;
85 struct ceph_snap_context *snapc;
86 size_t snap_names_len;
87 u64 snap_seq;
88 u32 total_snaps;
89
90 char *snap_names;
91 u64 *snap_sizes;
92
93 u64 obj_version;
94};
95
96struct rbd_options {
97 int notify_timeout;
98};
99
100
101
102
103struct rbd_client {
104 struct ceph_client *client;
105 struct rbd_options *rbd_opts;
106 struct kref kref;
107 struct list_head node;
108};
109
110
111
112
113struct rbd_req_status {
114 int done;
115 int rc;
116 u64 bytes;
117};
118
119
120
121
122struct rbd_req_coll {
123 int total;
124 int num_done;
125 struct kref kref;
126 struct rbd_req_status status[0];
127};
128
129
130
131
132struct rbd_request {
133 struct request *rq;
134 struct bio *bio;
135 struct page **pages;
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
139};
140
141struct rbd_snap {
142 struct device dev;
143 const char *name;
144 u64 size;
145 struct list_head node;
146 u64 id;
147};
148
149
150
151
152struct rbd_device {
153 int id;
154
155 int major;
156 struct gendisk *disk;
157 struct request_queue *q;
158
159 struct rbd_client *rbd_client;
160
161 char name[DEV_NAME_LEN];
162
163 spinlock_t lock;
164
165 struct rbd_image_header header;
166 char obj[RBD_MAX_OBJ_NAME_LEN];
167 int obj_len;
168 char obj_md_name[RBD_MAX_MD_NAME_LEN];
169 char pool_name[RBD_MAX_POOL_NAME_LEN];
170 int poolid;
171
172 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request;
174
175
176 struct rw_semaphore header_rwsem;
177 char snap_name[RBD_MAX_SNAP_NAME_LEN];
178 u64 snap_id;
179 int read_only;
180
181 struct list_head node;
182
183
184 struct list_head snaps;
185
186
187 struct device dev;
188};
189
190static DEFINE_MUTEX(ctl_mutex);
191
192static LIST_HEAD(rbd_dev_list);
193static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
195static LIST_HEAD(rbd_client_list);
196static DEFINE_SPINLOCK(rbd_client_list_lock);
197
198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199static void rbd_dev_release(struct device *dev);
200static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
202 const char *buf,
203 size_t count);
204static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
205 struct rbd_snap *snap);
206
207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count);
209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 size_t count);
211
212static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 __ATTR_NULL
216};
217
218static struct bus_type rbd_bus_type = {
219 .name = "rbd",
220 .bus_attrs = rbd_bus_attrs,
221};
222
223static void rbd_root_dev_release(struct device *dev)
224{
225}
226
227static struct device rbd_root_dev = {
228 .init_name = "rbd",
229 .release = rbd_root_dev_release,
230};
231
232
233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234{
235 return get_device(&rbd_dev->dev);
236}
237
238static void rbd_put_dev(struct rbd_device *rbd_dev)
239{
240 put_device(&rbd_dev->dev);
241}
242
243static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249 rbd_get_dev(rbd_dev);
250
251 set_device_ro(bdev, rbd_dev->read_only);
252
253 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254 return -EROFS;
255
256 return 0;
257}
258
259static int rbd_release(struct gendisk *disk, fmode_t mode)
260{
261 struct rbd_device *rbd_dev = disk->private_data;
262
263 rbd_put_dev(rbd_dev);
264
265 return 0;
266}
267
268static const struct block_device_operations rbd_bd_ops = {
269 .owner = THIS_MODULE,
270 .open = rbd_open,
271 .release = rbd_release,
272};
273
274
275
276
277
278static struct rbd_client *rbd_client_create(struct ceph_options *opt,
279 struct rbd_options *rbd_opts)
280{
281 struct rbd_client *rbdc;
282 int ret = -ENOMEM;
283
284 dout("rbd_client_create\n");
285 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286 if (!rbdc)
287 goto out_opt;
288
289 kref_init(&rbdc->kref);
290 INIT_LIST_HEAD(&rbdc->node);
291
292 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
295 if (IS_ERR(rbdc->client))
296 goto out_mutex;
297 opt = NULL;
298
299 ret = ceph_open_session(rbdc->client);
300 if (ret < 0)
301 goto out_err;
302
303 rbdc->rbd_opts = rbd_opts;
304
305 spin_lock(&rbd_client_list_lock);
306 list_add_tail(&rbdc->node, &rbd_client_list);
307 spin_unlock(&rbd_client_list_lock);
308
309 mutex_unlock(&ctl_mutex);
310
311 dout("rbd_client_create created %p\n", rbdc);
312 return rbdc;
313
314out_err:
315 ceph_destroy_client(rbdc->client);
316out_mutex:
317 mutex_unlock(&ctl_mutex);
318 kfree(rbdc);
319out_opt:
320 if (opt)
321 ceph_destroy_options(opt);
322 return ERR_PTR(ret);
323}
324
325
326
327
328static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
329{
330 struct rbd_client *client_node;
331
332 if (opt->flags & CEPH_OPT_NOSHARE)
333 return NULL;
334
335 list_for_each_entry(client_node, &rbd_client_list, node)
336 if (ceph_compare_options(opt, client_node->client) == 0)
337 return client_node;
338 return NULL;
339}
340
341
342
343
344enum {
345 Opt_notify_timeout,
346 Opt_last_int,
347
348 Opt_last_string,
349
350};
351
352static match_table_t rbdopt_tokens = {
353 {Opt_notify_timeout, "notify_timeout=%d"},
354
355
356 {-1, NULL}
357};
358
359static int parse_rbd_opts_token(char *c, void *private)
360{
361 struct rbd_options *rbdopt = private;
362 substring_t argstr[MAX_OPT_ARGS];
363 int token, intval, ret;
364
365 token = match_token(c, rbdopt_tokens, argstr);
366 if (token < 0)
367 return -EINVAL;
368
369 if (token < Opt_last_int) {
370 ret = match_int(&argstr[0], &intval);
371 if (ret < 0) {
372 pr_err("bad mount option arg (not int) "
373 "at '%s'\n", c);
374 return ret;
375 }
376 dout("got int token %d val %d\n", token, intval);
377 } else if (token > Opt_last_int && token < Opt_last_string) {
378 dout("got string token %d val %s\n", token,
379 argstr[0].from);
380 } else {
381 dout("got token %d\n", token);
382 }
383
384 switch (token) {
385 case Opt_notify_timeout:
386 rbdopt->notify_timeout = intval;
387 break;
388 default:
389 BUG_ON(token);
390 }
391 return 0;
392}
393
394
395
396
397
398static struct rbd_client *rbd_get_client(const char *mon_addr,
399 size_t mon_addr_len,
400 char *options)
401{
402 struct rbd_client *rbdc;
403 struct ceph_options *opt;
404 struct rbd_options *rbd_opts;
405
406 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 if (!rbd_opts)
408 return ERR_PTR(-ENOMEM);
409
410 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412 opt = ceph_parse_options(options, mon_addr,
413 mon_addr + mon_addr_len,
414 parse_rbd_opts_token, rbd_opts);
415 if (IS_ERR(opt)) {
416 kfree(rbd_opts);
417 return ERR_CAST(opt);
418 }
419
420 spin_lock(&rbd_client_list_lock);
421 rbdc = __rbd_client_find(opt);
422 if (rbdc) {
423
424 kref_get(&rbdc->kref);
425 spin_unlock(&rbd_client_list_lock);
426
427 ceph_destroy_options(opt);
428 kfree(rbd_opts);
429
430 return rbdc;
431 }
432 spin_unlock(&rbd_client_list_lock);
433
434 rbdc = rbd_client_create(opt, rbd_opts);
435
436 if (IS_ERR(rbdc))
437 kfree(rbd_opts);
438
439 return rbdc;
440}
441
442
443
444
445
446
447static void rbd_client_release(struct kref *kref)
448{
449 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451 dout("rbd_release_client %p\n", rbdc);
452 spin_lock(&rbd_client_list_lock);
453 list_del(&rbdc->node);
454 spin_unlock(&rbd_client_list_lock);
455
456 ceph_destroy_client(rbdc->client);
457 kfree(rbdc->rbd_opts);
458 kfree(rbdc);
459}
460
461
462
463
464
465static void rbd_put_client(struct rbd_device *rbd_dev)
466{
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 rbd_dev->rbd_client = NULL;
469}
470
471
472
473
474static void rbd_coll_release(struct kref *kref)
475{
476 struct rbd_req_coll *coll =
477 container_of(kref, struct rbd_req_coll, kref);
478
479 dout("rbd_coll_release %p\n", coll);
480 kfree(coll);
481}
482
483
484
485
486
487static int rbd_header_from_disk(struct rbd_image_header *header,
488 struct rbd_image_header_ondisk *ondisk,
489 u32 allocated_snaps,
490 gfp_t gfp_flags)
491{
492 u32 i, snap_count;
493
494 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 return -ENXIO;
496
497 snap_count = le32_to_cpu(ondisk->snap_count);
498 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 / sizeof (*ondisk))
500 return -EINVAL;
501 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
502 snap_count * sizeof(u64),
503 gfp_flags);
504 if (!header->snapc)
505 return -ENOMEM;
506
507 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
508 if (snap_count) {
509 header->snap_names = kmalloc(header->snap_names_len,
510 gfp_flags);
511 if (!header->snap_names)
512 goto err_snapc;
513 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
514 gfp_flags);
515 if (!header->snap_sizes)
516 goto err_names;
517 } else {
518 header->snap_names = NULL;
519 header->snap_sizes = NULL;
520 }
521 memcpy(header->block_name, ondisk->block_name,
522 sizeof(ondisk->block_name));
523
524 header->image_size = le64_to_cpu(ondisk->image_size);
525 header->obj_order = ondisk->options.order;
526 header->crypt_type = ondisk->options.crypt_type;
527 header->comp_type = ondisk->options.comp_type;
528
529 atomic_set(&header->snapc->nref, 1);
530 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
531 header->snapc->num_snaps = snap_count;
532 header->total_snaps = snap_count;
533
534 if (snap_count && allocated_snaps == snap_count) {
535 for (i = 0; i < snap_count; i++) {
536 header->snapc->snaps[i] =
537 le64_to_cpu(ondisk->snaps[i].id);
538 header->snap_sizes[i] =
539 le64_to_cpu(ondisk->snaps[i].image_size);
540 }
541
542
543 memcpy(header->snap_names, &ondisk->snaps[i],
544 header->snap_names_len);
545 }
546
547 return 0;
548
549err_names:
550 kfree(header->snap_names);
551err_snapc:
552 kfree(header->snapc);
553 return -ENOMEM;
554}
555
556static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
557 u64 *seq, u64 *size)
558{
559 int i;
560 char *p = header->snap_names;
561
562 for (i = 0; i < header->total_snaps; i++) {
563 if (!strcmp(snap_name, p)) {
564
565
566
567 if (seq)
568 *seq = header->snapc->snaps[i];
569 if (size)
570 *size = header->snap_sizes[i];
571 return i;
572 }
573 p += strlen(p) + 1;
574 }
575 return -ENOENT;
576}
577
578static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
579{
580 struct rbd_image_header *header = &dev->header;
581 struct ceph_snap_context *snapc = header->snapc;
582 int ret = -ENOENT;
583
584 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
585
586 down_write(&dev->header_rwsem);
587
588 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
589 sizeof (RBD_SNAP_HEAD_NAME))) {
590 if (header->total_snaps)
591 snapc->seq = header->snap_seq;
592 else
593 snapc->seq = 0;
594 dev->snap_id = CEPH_NOSNAP;
595 dev->read_only = 0;
596 if (size)
597 *size = header->image_size;
598 } else {
599 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
600 if (ret < 0)
601 goto done;
602 dev->snap_id = snapc->seq;
603 dev->read_only = 1;
604 }
605
606 ret = 0;
607done:
608 up_write(&dev->header_rwsem);
609 return ret;
610}
611
612static void rbd_header_free(struct rbd_image_header *header)
613{
614 kfree(header->snapc);
615 kfree(header->snap_names);
616 kfree(header->snap_sizes);
617}
618
619
620
621
622static u64 rbd_get_segment(struct rbd_image_header *header,
623 const char *block_name,
624 u64 ofs, u64 len,
625 char *seg_name, u64 *segofs)
626{
627 u64 seg = ofs >> header->obj_order;
628
629 if (seg_name)
630 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
631 "%s.%012llx", block_name, seg);
632
633 ofs = ofs & ((1 << header->obj_order) - 1);
634 len = min_t(u64, len, (1 << header->obj_order) - ofs);
635
636 if (segofs)
637 *segofs = ofs;
638
639 return len;
640}
641
642static int rbd_get_num_segments(struct rbd_image_header *header,
643 u64 ofs, u64 len)
644{
645 u64 start_seg = ofs >> header->obj_order;
646 u64 end_seg = (ofs + len - 1) >> header->obj_order;
647 return end_seg - start_seg + 1;
648}
649
650
651
652
653static u64 rbd_obj_bytes(struct rbd_image_header *header)
654{
655 return 1 << header->obj_order;
656}
657
658
659
660
661
662static void bio_chain_put(struct bio *chain)
663{
664 struct bio *tmp;
665
666 while (chain) {
667 tmp = chain;
668 chain = chain->bi_next;
669 bio_put(tmp);
670 }
671}
672
673
674
675
676static void zero_bio_chain(struct bio *chain, int start_ofs)
677{
678 struct bio_vec *bv;
679 unsigned long flags;
680 void *buf;
681 int i;
682 int pos = 0;
683
684 while (chain) {
685 bio_for_each_segment(bv, chain, i) {
686 if (pos + bv->bv_len > start_ofs) {
687 int remainder = max(start_ofs - pos, 0);
688 buf = bvec_kmap_irq(bv, &flags);
689 memset(buf + remainder, 0,
690 bv->bv_len - remainder);
691 bvec_kunmap_irq(buf, &flags);
692 }
693 pos += bv->bv_len;
694 }
695
696 chain = chain->bi_next;
697 }
698}
699
700
701
702
703
704static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
705 struct bio_pair **bp,
706 int len, gfp_t gfpmask)
707{
708 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
709 int total = 0;
710
711 if (*bp) {
712 bio_pair_release(*bp);
713 *bp = NULL;
714 }
715
716 while (old_chain && (total < len)) {
717 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
718 if (!tmp)
719 goto err_out;
720
721 if (total + old_chain->bi_size > len) {
722 struct bio_pair *bp;
723
724
725
726
727
728 dout("bio_chain_clone split! total=%d remaining=%d"
729 "bi_size=%d\n",
730 (int)total, (int)len-total,
731 (int)old_chain->bi_size);
732
733
734
735 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
736 if (!bp)
737 goto err_out;
738
739 __bio_clone(tmp, &bp->bio1);
740
741 *next = &bp->bio2;
742 } else {
743 __bio_clone(tmp, old_chain);
744 *next = old_chain->bi_next;
745 }
746
747 tmp->bi_bdev = NULL;
748 gfpmask &= ~__GFP_WAIT;
749 tmp->bi_next = NULL;
750
751 if (!new_chain) {
752 new_chain = tail = tmp;
753 } else {
754 tail->bi_next = tmp;
755 tail = tmp;
756 }
757 old_chain = old_chain->bi_next;
758
759 total += tmp->bi_size;
760 }
761
762 BUG_ON(total < len);
763
764 if (tail)
765 tail->bi_next = NULL;
766
767 *old = old_chain;
768
769 return new_chain;
770
771err_out:
772 dout("bio_chain_clone with err\n");
773 bio_chain_put(new_chain);
774 return NULL;
775}
776
777
778
779
780static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
781 int num_ops,
782 int opcode,
783 u32 payload_len)
784{
785 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
786 GFP_NOIO);
787 if (!*ops)
788 return -ENOMEM;
789 (*ops)[0].op = opcode;
790
791
792
793
794 (*ops)[0].payload_len = payload_len;
795 return 0;
796}
797
798static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
799{
800 kfree(ops);
801}
802
803static void rbd_coll_end_req_index(struct request *rq,
804 struct rbd_req_coll *coll,
805 int index,
806 int ret, u64 len)
807{
808 struct request_queue *q;
809 int min, max, i;
810
811 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
812 coll, index, ret, len);
813
814 if (!rq)
815 return;
816
817 if (!coll) {
818 blk_end_request(rq, ret, len);
819 return;
820 }
821
822 q = rq->q;
823
824 spin_lock_irq(q->queue_lock);
825 coll->status[index].done = 1;
826 coll->status[index].rc = ret;
827 coll->status[index].bytes = len;
828 max = min = coll->num_done;
829 while (max < coll->total && coll->status[max].done)
830 max++;
831
832 for (i = min; i<max; i++) {
833 __blk_end_request(rq, coll->status[i].rc,
834 coll->status[i].bytes);
835 coll->num_done++;
836 kref_put(&coll->kref, rbd_coll_release);
837 }
838 spin_unlock_irq(q->queue_lock);
839}
840
841static void rbd_coll_end_req(struct rbd_request *req,
842 int ret, u64 len)
843{
844 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
845}
846
847
848
849
850static int rbd_do_request(struct request *rq,
851 struct rbd_device *dev,
852 struct ceph_snap_context *snapc,
853 u64 snapid,
854 const char *obj, u64 ofs, u64 len,
855 struct bio *bio,
856 struct page **pages,
857 int num_pages,
858 int flags,
859 struct ceph_osd_req_op *ops,
860 int num_reply,
861 struct rbd_req_coll *coll,
862 int coll_index,
863 void (*rbd_cb)(struct ceph_osd_request *req,
864 struct ceph_msg *msg),
865 struct ceph_osd_request **linger_req,
866 u64 *ver)
867{
868 struct ceph_osd_request *req;
869 struct ceph_file_layout *layout;
870 int ret;
871 u64 bno;
872 struct timespec mtime = CURRENT_TIME;
873 struct rbd_request *req_data;
874 struct ceph_osd_request_head *reqhead;
875 struct ceph_osd_client *osdc;
876
877 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
878 if (!req_data) {
879 if (coll)
880 rbd_coll_end_req_index(rq, coll, coll_index,
881 -ENOMEM, len);
882 return -ENOMEM;
883 }
884
885 if (coll) {
886 req_data->coll = coll;
887 req_data->coll_index = coll_index;
888 }
889
890 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
891
892 down_read(&dev->header_rwsem);
893
894 osdc = &dev->rbd_client->client->osdc;
895 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
896 false, GFP_NOIO, pages, bio);
897 if (!req) {
898 up_read(&dev->header_rwsem);
899 ret = -ENOMEM;
900 goto done_pages;
901 }
902
903 req->r_callback = rbd_cb;
904
905 req_data->rq = rq;
906 req_data->bio = bio;
907 req_data->pages = pages;
908 req_data->len = len;
909
910 req->r_priv = req_data;
911
912 reqhead = req->r_request->front.iov_base;
913 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
914
915 strncpy(req->r_oid, obj, sizeof(req->r_oid));
916 req->r_oid_len = strlen(req->r_oid);
917
918 layout = &req->r_file_layout;
919 memset(layout, 0, sizeof(*layout));
920 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
921 layout->fl_stripe_count = cpu_to_le32(1);
922 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
923 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
924 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
925 req, ops);
926
927 ceph_osdc_build_request(req, ofs, &len,
928 ops,
929 snapc,
930 &mtime,
931 req->r_oid, req->r_oid_len);
932 up_read(&dev->header_rwsem);
933
934 if (linger_req) {
935 ceph_osdc_set_request_linger(osdc, req);
936 *linger_req = req;
937 }
938
939 ret = ceph_osdc_start_request(osdc, req, false);
940 if (ret < 0)
941 goto done_err;
942
943 if (!rbd_cb) {
944 ret = ceph_osdc_wait_request(osdc, req);
945 if (ver)
946 *ver = le64_to_cpu(req->r_reassert_version.version);
947 dout("reassert_ver=%lld\n",
948 le64_to_cpu(req->r_reassert_version.version));
949 ceph_osdc_put_request(req);
950 }
951 return ret;
952
953done_err:
954 bio_chain_put(req_data->bio);
955 ceph_osdc_put_request(req);
956done_pages:
957 rbd_coll_end_req(req_data, ret, len);
958 kfree(req_data);
959 return ret;
960}
961
962
963
964
965static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
966{
967 struct rbd_request *req_data = req->r_priv;
968 struct ceph_osd_reply_head *replyhead;
969 struct ceph_osd_op *op;
970 __s32 rc;
971 u64 bytes;
972 int read_op;
973
974
975 replyhead = msg->front.iov_base;
976 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
977 op = (void *)(replyhead + 1);
978 rc = le32_to_cpu(replyhead->result);
979 bytes = le64_to_cpu(op->extent.length);
980 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
981
982 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
983
984 if (rc == -ENOENT && read_op) {
985 zero_bio_chain(req_data->bio, 0);
986 rc = 0;
987 } else if (rc == 0 && read_op && bytes < req_data->len) {
988 zero_bio_chain(req_data->bio, bytes);
989 bytes = req_data->len;
990 }
991
992 rbd_coll_end_req(req_data, rc, bytes);
993
994 if (req_data->bio)
995 bio_chain_put(req_data->bio);
996
997 ceph_osdc_put_request(req);
998 kfree(req_data);
999}
1000
1001static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002{
1003 ceph_osdc_put_request(req);
1004}
1005
1006
1007
1008
1009static int rbd_req_sync_op(struct rbd_device *dev,
1010 struct ceph_snap_context *snapc,
1011 u64 snapid,
1012 int opcode,
1013 int flags,
1014 struct ceph_osd_req_op *orig_ops,
1015 int num_reply,
1016 const char *obj,
1017 u64 ofs, u64 len,
1018 char *buf,
1019 struct ceph_osd_request **linger_req,
1020 u64 *ver)
1021{
1022 int ret;
1023 struct page **pages;
1024 int num_pages;
1025 struct ceph_osd_req_op *ops = orig_ops;
1026 u32 payload_len;
1027
1028 num_pages = calc_pages_for(ofs , len);
1029 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030 if (IS_ERR(pages))
1031 return PTR_ERR(pages);
1032
1033 if (!orig_ops) {
1034 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1035 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036 if (ret < 0)
1037 goto done;
1038
1039 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041 if (ret < 0)
1042 goto done_ops;
1043 }
1044 }
1045
1046 ret = rbd_do_request(NULL, dev, snapc, snapid,
1047 obj, ofs, len, NULL,
1048 pages, num_pages,
1049 flags,
1050 ops,
1051 2,
1052 NULL, 0,
1053 NULL,
1054 linger_req, ver);
1055 if (ret < 0)
1056 goto done_ops;
1057
1058 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060
1061done_ops:
1062 if (!orig_ops)
1063 rbd_destroy_ops(ops);
1064done:
1065 ceph_release_page_vector(pages, num_pages);
1066 return ret;
1067}
1068
1069
1070
1071
1072static int rbd_do_op(struct request *rq,
1073 struct rbd_device *rbd_dev ,
1074 struct ceph_snap_context *snapc,
1075 u64 snapid,
1076 int opcode, int flags, int num_reply,
1077 u64 ofs, u64 len,
1078 struct bio *bio,
1079 struct rbd_req_coll *coll,
1080 int coll_index)
1081{
1082 char *seg_name;
1083 u64 seg_ofs;
1084 u64 seg_len;
1085 int ret;
1086 struct ceph_osd_req_op *ops;
1087 u32 payload_len;
1088
1089 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090 if (!seg_name)
1091 return -ENOMEM;
1092
1093 seg_len = rbd_get_segment(&rbd_dev->header,
1094 rbd_dev->header.block_name,
1095 ofs, len,
1096 seg_name, &seg_ofs);
1097
1098 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1101 if (ret < 0)
1102 goto done;
1103
1104
1105
1106
1107 BUG_ON(seg_len < len);
1108
1109 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110 seg_name, seg_ofs, seg_len,
1111 bio,
1112 NULL, 0,
1113 flags,
1114 ops,
1115 num_reply,
1116 coll, coll_index,
1117 rbd_req_cb, 0, NULL);
1118
1119 rbd_destroy_ops(ops);
1120done:
1121 kfree(seg_name);
1122 return ret;
1123}
1124
1125
1126
1127
1128static int rbd_req_write(struct request *rq,
1129 struct rbd_device *rbd_dev,
1130 struct ceph_snap_context *snapc,
1131 u64 ofs, u64 len,
1132 struct bio *bio,
1133 struct rbd_req_coll *coll,
1134 int coll_index)
1135{
1136 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137 CEPH_OSD_OP_WRITE,
1138 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139 2,
1140 ofs, len, bio, coll, coll_index);
1141}
1142
1143
1144
1145
1146static int rbd_req_read(struct request *rq,
1147 struct rbd_device *rbd_dev,
1148 u64 snapid,
1149 u64 ofs, u64 len,
1150 struct bio *bio,
1151 struct rbd_req_coll *coll,
1152 int coll_index)
1153{
1154 return rbd_do_op(rq, rbd_dev, NULL,
1155 snapid,
1156 CEPH_OSD_OP_READ,
1157 CEPH_OSD_FLAG_READ,
1158 2,
1159 ofs, len, bio, coll, coll_index);
1160}
1161
1162
1163
1164
1165static int rbd_req_sync_read(struct rbd_device *dev,
1166 struct ceph_snap_context *snapc,
1167 u64 snapid,
1168 const char *obj,
1169 u64 ofs, u64 len,
1170 char *buf,
1171 u64 *ver)
1172{
1173 return rbd_req_sync_op(dev, NULL,
1174 snapid,
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ,
1177 NULL,
1178 1, obj, ofs, len, buf, NULL, ver);
1179}
1180
1181
1182
1183
1184static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1185 u64 ver,
1186 u64 notify_id,
1187 const char *obj)
1188{
1189 struct ceph_osd_req_op *ops;
1190 struct page **pages = NULL;
1191 int ret;
1192
1193 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194 if (ret < 0)
1195 return ret;
1196
1197 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1198 ops[0].watch.cookie = notify_id;
1199 ops[0].watch.flag = 0;
1200
1201 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1202 obj, 0, 0, NULL,
1203 pages, 0,
1204 CEPH_OSD_FLAG_READ,
1205 ops,
1206 1,
1207 NULL, 0,
1208 rbd_simple_req_cb, 0, NULL);
1209
1210 rbd_destroy_ops(ops);
1211 return ret;
1212}
1213
1214static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215{
1216 struct rbd_device *dev = (struct rbd_device *)data;
1217 int rc;
1218
1219 if (!dev)
1220 return;
1221
1222 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1223 notify_id, (int)opcode);
1224 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225 rc = __rbd_refresh_header(dev);
1226 mutex_unlock(&ctl_mutex);
1227 if (rc)
1228 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229 " update snaps: %d\n", dev->major, rc);
1230
1231 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1232}
1233
1234
1235
1236
1237static int rbd_req_sync_watch(struct rbd_device *dev,
1238 const char *obj,
1239 u64 ver)
1240{
1241 struct ceph_osd_req_op *ops;
1242 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1243
1244 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245 if (ret < 0)
1246 return ret;
1247
1248 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 (void *)dev, &dev->watch_event);
1250 if (ret < 0)
1251 goto fail;
1252
1253 ops[0].watch.ver = cpu_to_le64(ver);
1254 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1255 ops[0].watch.flag = 1;
1256
1257 ret = rbd_req_sync_op(dev, NULL,
1258 CEPH_NOSNAP,
1259 0,
1260 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 ops,
1262 1, obj, 0, 0, NULL,
1263 &dev->watch_request, NULL);
1264
1265 if (ret < 0)
1266 goto fail_event;
1267
1268 rbd_destroy_ops(ops);
1269 return 0;
1270
1271fail_event:
1272 ceph_osdc_cancel_event(dev->watch_event);
1273 dev->watch_event = NULL;
1274fail:
1275 rbd_destroy_ops(ops);
1276 return ret;
1277}
1278
1279
1280
1281
1282static int rbd_req_sync_unwatch(struct rbd_device *dev,
1283 const char *obj)
1284{
1285 struct ceph_osd_req_op *ops;
1286
1287 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288 if (ret < 0)
1289 return ret;
1290
1291 ops[0].watch.ver = 0;
1292 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1293 ops[0].watch.flag = 0;
1294
1295 ret = rbd_req_sync_op(dev, NULL,
1296 CEPH_NOSNAP,
1297 0,
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 ops,
1300 1, obj, 0, 0, NULL, NULL, NULL);
1301
1302 rbd_destroy_ops(ops);
1303 ceph_osdc_cancel_event(dev->watch_event);
1304 dev->watch_event = NULL;
1305 return ret;
1306}
1307
1308struct rbd_notify_info {
1309 struct rbd_device *dev;
1310};
1311
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{
1314 struct rbd_device *dev = (struct rbd_device *)data;
1315 if (!dev)
1316 return;
1317
1318 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1319 notify_id, (int)opcode);
1320}
1321
1322
1323
1324
1325static int rbd_req_sync_notify(struct rbd_device *dev,
1326 const char *obj)
1327{
1328 struct ceph_osd_req_op *ops;
1329 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1330 struct ceph_osd_event *event;
1331 struct rbd_notify_info info;
1332 int payload_len = sizeof(u32) + sizeof(u32);
1333 int ret;
1334
1335 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336 if (ret < 0)
1337 return ret;
1338
1339 info.dev = dev;
1340
1341 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342 (void *)&info, &event);
1343 if (ret < 0)
1344 goto fail;
1345
1346 ops[0].watch.ver = 1;
1347 ops[0].watch.flag = 1;
1348 ops[0].watch.cookie = event->cookie;
1349 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350 ops[0].watch.timeout = 12;
1351
1352 ret = rbd_req_sync_op(dev, NULL,
1353 CEPH_NOSNAP,
1354 0,
1355 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 ops,
1357 1, obj, 0, 0, NULL, NULL, NULL);
1358 if (ret < 0)
1359 goto fail_event;
1360
1361 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362 dout("ceph_osdc_wait_event returned %d\n", ret);
1363 rbd_destroy_ops(ops);
1364 return 0;
1365
1366fail_event:
1367 ceph_osdc_cancel_event(event);
1368fail:
1369 rbd_destroy_ops(ops);
1370 return ret;
1371}
1372
1373
1374
1375
1376static int rbd_req_sync_exec(struct rbd_device *dev,
1377 const char *obj,
1378 const char *cls,
1379 const char *method,
1380 const char *data,
1381 int len,
1382 u64 *ver)
1383{
1384 struct ceph_osd_req_op *ops;
1385 int cls_len = strlen(cls);
1386 int method_len = strlen(method);
1387 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388 cls_len + method_len + len);
1389 if (ret < 0)
1390 return ret;
1391
1392 ops[0].cls.class_name = cls;
1393 ops[0].cls.class_len = (__u8)cls_len;
1394 ops[0].cls.method_name = method;
1395 ops[0].cls.method_len = (__u8)method_len;
1396 ops[0].cls.argc = 0;
1397 ops[0].cls.indata = data;
1398 ops[0].cls.indata_len = len;
1399
1400 ret = rbd_req_sync_op(dev, NULL,
1401 CEPH_NOSNAP,
1402 0,
1403 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404 ops,
1405 1, obj, 0, 0, NULL, NULL, ver);
1406
1407 rbd_destroy_ops(ops);
1408
1409 dout("cls_exec returned %d\n", ret);
1410 return ret;
1411}
1412
1413static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414{
1415 struct rbd_req_coll *coll =
1416 kzalloc(sizeof(struct rbd_req_coll) +
1417 sizeof(struct rbd_req_status) * num_reqs,
1418 GFP_ATOMIC);
1419
1420 if (!coll)
1421 return NULL;
1422 coll->total = num_reqs;
1423 kref_init(&coll->kref);
1424 return coll;
1425}
1426
1427
1428
1429
1430static void rbd_rq_fn(struct request_queue *q)
1431{
1432 struct rbd_device *rbd_dev = q->queuedata;
1433 struct request *rq;
1434 struct bio_pair *bp = NULL;
1435
1436 while ((rq = blk_fetch_request(q))) {
1437 struct bio *bio;
1438 struct bio *rq_bio, *next_bio = NULL;
1439 bool do_write;
1440 int size, op_size = 0;
1441 u64 ofs;
1442 int num_segs, cur_seg = 0;
1443 struct rbd_req_coll *coll;
1444
1445
1446 if (!rq)
1447 break;
1448
1449 dout("fetched request\n");
1450
1451
1452 if ((rq->cmd_type != REQ_TYPE_FS)) {
1453 __blk_end_request_all(rq, 0);
1454 continue;
1455 }
1456
1457
1458 do_write = (rq_data_dir(rq) == WRITE);
1459
1460 size = blk_rq_bytes(rq);
1461 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462 rq_bio = rq->bio;
1463 if (do_write && rbd_dev->read_only) {
1464 __blk_end_request_all(rq, -EROFS);
1465 continue;
1466 }
1467
1468 spin_unlock_irq(q->queue_lock);
1469
1470 dout("%s 0x%x bytes at 0x%llx\n",
1471 do_write ? "write" : "read",
1472 size, blk_rq_pos(rq) * SECTOR_SIZE);
1473
1474 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475 coll = rbd_alloc_coll(num_segs);
1476 if (!coll) {
1477 spin_lock_irq(q->queue_lock);
1478 __blk_end_request_all(rq, -ENOMEM);
1479 continue;
1480 }
1481
1482 do {
1483
1484 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1485 op_size = rbd_get_segment(&rbd_dev->header,
1486 rbd_dev->header.block_name,
1487 ofs, size,
1488 NULL, NULL);
1489 kref_get(&coll->kref);
1490 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1491 op_size, GFP_ATOMIC);
1492 if (!bio) {
1493 rbd_coll_end_req_index(rq, coll, cur_seg,
1494 -ENOMEM, op_size);
1495 goto next_seg;
1496 }
1497
1498
1499
1500 if (do_write)
1501 rbd_req_write(rq, rbd_dev,
1502 rbd_dev->header.snapc,
1503 ofs,
1504 op_size, bio,
1505 coll, cur_seg);
1506 else
1507 rbd_req_read(rq, rbd_dev,
1508 rbd_dev->snap_id,
1509 ofs,
1510 op_size, bio,
1511 coll, cur_seg);
1512
1513next_seg:
1514 size -= op_size;
1515 ofs += op_size;
1516
1517 cur_seg++;
1518 rq_bio = next_bio;
1519 } while (size > 0);
1520 kref_put(&coll->kref, rbd_coll_release);
1521
1522 if (bp)
1523 bio_pair_release(bp);
1524 spin_lock_irq(q->queue_lock);
1525 }
1526}
1527
1528
1529
1530
1531
1532
1533static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1534 struct bio_vec *bvec)
1535{
1536 struct rbd_device *rbd_dev = q->queuedata;
1537 unsigned int chunk_sectors;
1538 sector_t sector;
1539 unsigned int bio_sectors;
1540 int max;
1541
1542 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1543 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1544 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1545
1546 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1547 + bio_sectors)) << SECTOR_SHIFT;
1548 if (max < 0)
1549 max = 0;
1550 if (max <= bvec->bv_len && bio_sectors == 0)
1551 return bvec->bv_len;
1552 return max;
1553}
1554
1555static void rbd_free_disk(struct rbd_device *rbd_dev)
1556{
1557 struct gendisk *disk = rbd_dev->disk;
1558
1559 if (!disk)
1560 return;
1561
1562 rbd_header_free(&rbd_dev->header);
1563
1564 if (disk->flags & GENHD_FL_UP)
1565 del_gendisk(disk);
1566 if (disk->queue)
1567 blk_cleanup_queue(disk->queue);
1568 put_disk(disk);
1569}
1570
1571
1572
1573
1574static int rbd_read_header(struct rbd_device *rbd_dev,
1575 struct rbd_image_header *header)
1576{
1577 ssize_t rc;
1578 struct rbd_image_header_ondisk *dh;
1579 u32 snap_count = 0;
1580 u64 ver;
1581 size_t len;
1582
1583
1584
1585
1586
1587
1588 len = sizeof (*dh);
1589 while (1) {
1590 dh = kmalloc(len, GFP_KERNEL);
1591 if (!dh)
1592 return -ENOMEM;
1593
1594 rc = rbd_req_sync_read(rbd_dev,
1595 NULL, CEPH_NOSNAP,
1596 rbd_dev->obj_md_name,
1597 0, len,
1598 (char *)dh, &ver);
1599 if (rc < 0)
1600 goto out_dh;
1601
1602 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1603 if (rc < 0) {
1604 if (rc == -ENXIO)
1605 pr_warning("unrecognized header format"
1606 " for image %s", rbd_dev->obj);
1607 goto out_dh;
1608 }
1609
1610 if (snap_count == header->total_snaps)
1611 break;
1612
1613 snap_count = header->total_snaps;
1614 len = sizeof (*dh) +
1615 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1616 header->snap_names_len;
1617
1618 rbd_header_free(header);
1619 kfree(dh);
1620 }
1621 header->obj_version = ver;
1622
1623out_dh:
1624 kfree(dh);
1625 return rc;
1626}
1627
1628
1629
1630
1631static int rbd_header_add_snap(struct rbd_device *dev,
1632 const char *snap_name,
1633 gfp_t gfp_flags)
1634{
1635 int name_len = strlen(snap_name);
1636 u64 new_snapid;
1637 int ret;
1638 void *data, *p, *e;
1639 u64 ver;
1640 struct ceph_mon_client *monc;
1641
1642
1643 if (dev->snap_id != CEPH_NOSNAP)
1644 return -EINVAL;
1645
1646 monc = &dev->rbd_client->client->monc;
1647 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1648 dout("created snapid=%lld\n", new_snapid);
1649 if (ret < 0)
1650 return ret;
1651
1652 data = kmalloc(name_len + 16, gfp_flags);
1653 if (!data)
1654 return -ENOMEM;
1655
1656 p = data;
1657 e = data + name_len + 16;
1658
1659 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660 ceph_encode_64_safe(&p, e, new_snapid, bad);
1661
1662 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1663 data, p - data, &ver);
1664
1665 kfree(data);
1666
1667 if (ret < 0)
1668 return ret;
1669
1670 down_write(&dev->header_rwsem);
1671 dev->header.snapc->seq = new_snapid;
1672 up_write(&dev->header_rwsem);
1673
1674 return 0;
1675bad:
1676 return -ERANGE;
1677}
1678
1679static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680{
1681 struct rbd_snap *snap;
1682
1683 while (!list_empty(&rbd_dev->snaps)) {
1684 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1685 __rbd_remove_snap_dev(rbd_dev, snap);
1686 }
1687}
1688
1689
1690
1691
1692static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1693{
1694 int ret;
1695 struct rbd_image_header h;
1696 u64 snap_seq;
1697 int follow_seq = 0;
1698
1699 ret = rbd_read_header(rbd_dev, &h);
1700 if (ret < 0)
1701 return ret;
1702
1703
1704 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705
1706 down_write(&rbd_dev->header_rwsem);
1707
1708 snap_seq = rbd_dev->header.snapc->seq;
1709 if (rbd_dev->header.total_snaps &&
1710 rbd_dev->header.snapc->snaps[0] == snap_seq)
1711
1712
1713 follow_seq = 1;
1714
1715 kfree(rbd_dev->header.snapc);
1716 kfree(rbd_dev->header.snap_names);
1717 kfree(rbd_dev->header.snap_sizes);
1718
1719 rbd_dev->header.total_snaps = h.total_snaps;
1720 rbd_dev->header.snapc = h.snapc;
1721 rbd_dev->header.snap_names = h.snap_names;
1722 rbd_dev->header.snap_names_len = h.snap_names_len;
1723 rbd_dev->header.snap_sizes = h.snap_sizes;
1724 if (follow_seq)
1725 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1726 else
1727 rbd_dev->header.snapc->seq = snap_seq;
1728
1729 ret = __rbd_init_snaps_header(rbd_dev);
1730
1731 up_write(&rbd_dev->header_rwsem);
1732
1733 return ret;
1734}
1735
1736static int rbd_init_disk(struct rbd_device *rbd_dev)
1737{
1738 struct gendisk *disk;
1739 struct request_queue *q;
1740 int rc;
1741 u64 segment_size;
1742 u64 total_size = 0;
1743
1744
1745 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746 if (rc)
1747 return rc;
1748
1749
1750 rc = __rbd_init_snaps_header(rbd_dev);
1751 if (rc)
1752 return rc;
1753
1754 rc = rbd_header_set_snap(rbd_dev, &total_size);
1755 if (rc)
1756 return rc;
1757
1758
1759 rc = -ENOMEM;
1760 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761 if (!disk)
1762 goto out;
1763
1764 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765 rbd_dev->id);
1766 disk->major = rbd_dev->major;
1767 disk->first_minor = 0;
1768 disk->fops = &rbd_bd_ops;
1769 disk->private_data = rbd_dev;
1770
1771
1772 rc = -ENOMEM;
1773 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774 if (!q)
1775 goto out_disk;
1776
1777
1778 blk_queue_physical_block_size(q, SECTOR_SIZE);
1779
1780
1781 segment_size = rbd_obj_bytes(&rbd_dev->header);
1782 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1783 blk_queue_max_segment_size(q, segment_size);
1784 blk_queue_io_min(q, segment_size);
1785 blk_queue_io_opt(q, segment_size);
1786
1787 blk_queue_merge_bvec(q, rbd_merge_bvec);
1788 disk->queue = q;
1789
1790 q->queuedata = rbd_dev;
1791
1792 rbd_dev->disk = disk;
1793 rbd_dev->q = q;
1794
1795
1796 set_capacity(disk, total_size / SECTOR_SIZE);
1797 add_disk(disk);
1798
1799 pr_info("%s: added with size 0x%llx\n",
1800 disk->disk_name, (unsigned long long)total_size);
1801 return 0;
1802
1803out_disk:
1804 put_disk(disk);
1805out:
1806 return rc;
1807}
1808
1809
1810
1811
1812
1813static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1814{
1815 return container_of(dev, struct rbd_device, dev);
1816}
1817
1818static ssize_t rbd_size_show(struct device *dev,
1819 struct device_attribute *attr, char *buf)
1820{
1821 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1822
1823 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1824}
1825
1826static ssize_t rbd_major_show(struct device *dev,
1827 struct device_attribute *attr, char *buf)
1828{
1829 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1830
1831 return sprintf(buf, "%d\n", rbd_dev->major);
1832}
1833
1834static ssize_t rbd_client_id_show(struct device *dev,
1835 struct device_attribute *attr, char *buf)
1836{
1837 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838
1839 return sprintf(buf, "client%lld\n",
1840 ceph_client_id(rbd_dev->rbd_client->client));
1841}
1842
1843static ssize_t rbd_pool_show(struct device *dev,
1844 struct device_attribute *attr, char *buf)
1845{
1846 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847
1848 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849}
1850
1851static ssize_t rbd_name_show(struct device *dev,
1852 struct device_attribute *attr, char *buf)
1853{
1854 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855
1856 return sprintf(buf, "%s\n", rbd_dev->obj);
1857}
1858
1859static ssize_t rbd_snap_show(struct device *dev,
1860 struct device_attribute *attr,
1861 char *buf)
1862{
1863 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864
1865 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1866}
1867
1868static ssize_t rbd_image_refresh(struct device *dev,
1869 struct device_attribute *attr,
1870 const char *buf,
1871 size_t size)
1872{
1873 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874 int rc;
1875 int ret = size;
1876
1877 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878
1879 rc = __rbd_refresh_header(rbd_dev);
1880 if (rc < 0)
1881 ret = rc;
1882
1883 mutex_unlock(&ctl_mutex);
1884 return ret;
1885}
1886
1887static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1891static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1894static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1895
1896static struct attribute *rbd_attrs[] = {
1897 &dev_attr_size.attr,
1898 &dev_attr_major.attr,
1899 &dev_attr_client_id.attr,
1900 &dev_attr_pool.attr,
1901 &dev_attr_name.attr,
1902 &dev_attr_current_snap.attr,
1903 &dev_attr_refresh.attr,
1904 &dev_attr_create_snap.attr,
1905 NULL
1906};
1907
1908static struct attribute_group rbd_attr_group = {
1909 .attrs = rbd_attrs,
1910};
1911
1912static const struct attribute_group *rbd_attr_groups[] = {
1913 &rbd_attr_group,
1914 NULL
1915};
1916
1917static void rbd_sysfs_dev_release(struct device *dev)
1918{
1919}
1920
1921static struct device_type rbd_device_type = {
1922 .name = "rbd",
1923 .groups = rbd_attr_groups,
1924 .release = rbd_sysfs_dev_release,
1925};
1926
1927
1928
1929
1930
1931
1932static ssize_t rbd_snap_size_show(struct device *dev,
1933 struct device_attribute *attr,
1934 char *buf)
1935{
1936 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1937
1938 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1939}
1940
1941static ssize_t rbd_snap_id_show(struct device *dev,
1942 struct device_attribute *attr,
1943 char *buf)
1944{
1945 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1946
1947 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1948}
1949
1950static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1951static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1952
1953static struct attribute *rbd_snap_attrs[] = {
1954 &dev_attr_snap_size.attr,
1955 &dev_attr_snap_id.attr,
1956 NULL,
1957};
1958
1959static struct attribute_group rbd_snap_attr_group = {
1960 .attrs = rbd_snap_attrs,
1961};
1962
1963static void rbd_snap_dev_release(struct device *dev)
1964{
1965 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966 kfree(snap->name);
1967 kfree(snap);
1968}
1969
1970static const struct attribute_group *rbd_snap_attr_groups[] = {
1971 &rbd_snap_attr_group,
1972 NULL
1973};
1974
1975static struct device_type rbd_snap_device_type = {
1976 .groups = rbd_snap_attr_groups,
1977 .release = rbd_snap_dev_release,
1978};
1979
1980static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1981 struct rbd_snap *snap)
1982{
1983 list_del(&snap->node);
1984 device_unregister(&snap->dev);
1985}
1986
1987static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1988 struct rbd_snap *snap,
1989 struct device *parent)
1990{
1991 struct device *dev = &snap->dev;
1992 int ret;
1993
1994 dev->type = &rbd_snap_device_type;
1995 dev->parent = parent;
1996 dev->release = rbd_snap_dev_release;
1997 dev_set_name(dev, "snap_%s", snap->name);
1998 ret = device_register(dev);
1999
2000 return ret;
2001}
2002
2003static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004 int i, const char *name,
2005 struct rbd_snap **snapp)
2006{
2007 int ret;
2008 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2009 if (!snap)
2010 return -ENOMEM;
2011 snap->name = kstrdup(name, GFP_KERNEL);
2012 snap->size = rbd_dev->header.snap_sizes[i];
2013 snap->id = rbd_dev->header.snapc->snaps[i];
2014 if (device_is_registered(&rbd_dev->dev)) {
2015 ret = rbd_register_snap_dev(rbd_dev, snap,
2016 &rbd_dev->dev);
2017 if (ret < 0)
2018 goto err;
2019 }
2020 *snapp = snap;
2021 return 0;
2022err:
2023 kfree(snap->name);
2024 kfree(snap);
2025 return ret;
2026}
2027
2028
2029
2030
2031const char *rbd_prev_snap_name(const char *name, const char *start)
2032{
2033 if (name < start + 2)
2034 return NULL;
2035
2036 name -= 2;
2037 while (*name) {
2038 if (name == start)
2039 return start;
2040 name--;
2041 }
2042 return name + 1;
2043}
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2054{
2055 const char *name, *first_name;
2056 int i = rbd_dev->header.total_snaps;
2057 struct rbd_snap *snap, *old_snap = NULL;
2058 int ret;
2059 struct list_head *p, *n;
2060
2061 first_name = rbd_dev->header.snap_names;
2062 name = first_name + rbd_dev->header.snap_names_len;
2063
2064 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2065 u64 cur_id;
2066
2067 old_snap = list_entry(p, struct rbd_snap, node);
2068
2069 if (i)
2070 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071
2072 if (!i || old_snap->id < cur_id) {
2073
2074 __rbd_remove_snap_dev(rbd_dev, old_snap);
2075 continue;
2076 }
2077 if (old_snap->id == cur_id) {
2078
2079 i--;
2080 name = rbd_prev_snap_name(name, first_name);
2081 continue;
2082 }
2083 for (; i > 0;
2084 i--, name = rbd_prev_snap_name(name, first_name)) {
2085 if (!name) {
2086 WARN_ON(1);
2087 return -EINVAL;
2088 }
2089 cur_id = rbd_dev->header.snapc->snaps[i];
2090
2091 if (cur_id >= old_snap->id)
2092 break;
2093
2094 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095 if (ret < 0)
2096 return ret;
2097
2098
2099 list_add(&snap->node, n);
2100 p = &snap->node;
2101 }
2102 }
2103
2104 for (; i > 0; i--) {
2105 name = rbd_prev_snap_name(name, first_name);
2106 if (!name) {
2107 WARN_ON(1);
2108 return -EINVAL;
2109 }
2110 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2111 if (ret < 0)
2112 return ret;
2113 list_add(&snap->node, &rbd_dev->snaps);
2114 }
2115
2116 return 0;
2117}
2118
2119static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120{
2121 int ret;
2122 struct device *dev;
2123 struct rbd_snap *snap;
2124
2125 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2126 dev = &rbd_dev->dev;
2127
2128 dev->bus = &rbd_bus_type;
2129 dev->type = &rbd_device_type;
2130 dev->parent = &rbd_root_dev;
2131 dev->release = rbd_dev_release;
2132 dev_set_name(dev, "%d", rbd_dev->id);
2133 ret = device_register(dev);
2134 if (ret < 0)
2135 goto out;
2136
2137 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138 ret = rbd_register_snap_dev(rbd_dev, snap,
2139 &rbd_dev->dev);
2140 if (ret < 0)
2141 break;
2142 }
2143out:
2144 mutex_unlock(&ctl_mutex);
2145 return ret;
2146}
2147
2148static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2149{
2150 device_unregister(&rbd_dev->dev);
2151}
2152
2153static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154{
2155 int ret, rc;
2156
2157 do {
2158 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2159 rbd_dev->header.obj_version);
2160 if (ret == -ERANGE) {
2161 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162 rc = __rbd_refresh_header(rbd_dev);
2163 mutex_unlock(&ctl_mutex);
2164 if (rc < 0)
2165 return rc;
2166 }
2167 } while (ret == -ERANGE);
2168
2169 return ret;
2170}
2171
2172static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2173
2174
2175
2176
2177
2178static void rbd_id_get(struct rbd_device *rbd_dev)
2179{
2180 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2181
2182 spin_lock(&rbd_dev_list_lock);
2183 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2184 spin_unlock(&rbd_dev_list_lock);
2185}
2186
2187
2188
2189
2190
2191static void rbd_id_put(struct rbd_device *rbd_dev)
2192{
2193 struct list_head *tmp;
2194 int rbd_id = rbd_dev->id;
2195 int max_id;
2196
2197 BUG_ON(rbd_id < 1);
2198
2199 spin_lock(&rbd_dev_list_lock);
2200 list_del_init(&rbd_dev->node);
2201
2202
2203
2204
2205
2206 if (rbd_id != atomic64_read(&rbd_id_max)) {
2207 spin_unlock(&rbd_dev_list_lock);
2208 return;
2209 }
2210
2211
2212
2213
2214
2215
2216 max_id = 0;
2217 list_for_each_prev(tmp, &rbd_dev_list) {
2218 struct rbd_device *rbd_dev;
2219
2220 rbd_dev = list_entry(tmp, struct rbd_device, node);
2221 if (rbd_id > max_id)
2222 max_id = rbd_id;
2223 }
2224 spin_unlock(&rbd_dev_list_lock);
2225
2226
2227
2228
2229
2230
2231
2232 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233}
2234
2235
2236
2237
2238
2239
2240
2241static inline size_t next_token(const char **buf)
2242{
2243
2244
2245
2246
2247 const char *spaces = " \f\n\r\t\v";
2248
2249 *buf += strspn(*buf, spaces);
2250
2251 return strcspn(*buf, spaces);
2252}
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268static inline size_t copy_token(const char **buf,
2269 char *token,
2270 size_t token_size)
2271{
2272 size_t len;
2273
2274 len = next_token(buf);
2275 if (len < token_size) {
2276 memcpy(token, *buf, len);
2277 *(token + len) = '\0';
2278 }
2279 *buf += len;
2280
2281 return len;
2282}
2283
2284
2285
2286
2287
2288
2289
2290static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291 const char *buf,
2292 const char **mon_addrs,
2293 size_t *mon_addrs_size,
2294 char *options,
2295 size_t options_size)
2296{
2297 size_t len;
2298
2299
2300
2301 len = next_token(&buf);
2302 if (!len)
2303 return -EINVAL;
2304 *mon_addrs_size = len + 1;
2305 *mon_addrs = buf;
2306
2307 buf += len;
2308
2309 len = copy_token(&buf, options, options_size);
2310 if (!len || len >= options_size)
2311 return -EINVAL;
2312
2313 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2314 if (!len || len >= sizeof (rbd_dev->pool_name))
2315 return -EINVAL;
2316
2317 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318 if (!len || len >= sizeof (rbd_dev->obj))
2319 return -EINVAL;
2320
2321
2322
2323 rbd_dev->obj_len = len;
2324
2325 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2326 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2327 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2328
2329
2330
2331
2332
2333 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2334 if (!len)
2335 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336 sizeof (RBD_SNAP_HEAD_NAME));
2337 else if (len >= sizeof (rbd_dev->snap_name))
2338 return -EINVAL;
2339
2340 return 0;
2341}
2342
2343static ssize_t rbd_add(struct bus_type *bus,
2344 const char *buf,
2345 size_t count)
2346{
2347 struct rbd_device *rbd_dev;
2348 const char *mon_addrs = NULL;
2349 size_t mon_addrs_size = 0;
2350 char *options = NULL;
2351 struct ceph_osd_client *osdc;
2352 int rc = -ENOMEM;
2353
2354 if (!try_module_get(THIS_MODULE))
2355 return -ENODEV;
2356
2357 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358 if (!rbd_dev)
2359 goto err_nomem;
2360 options = kmalloc(count, GFP_KERNEL);
2361 if (!options)
2362 goto err_nomem;
2363
2364
2365 spin_lock_init(&rbd_dev->lock);
2366 INIT_LIST_HEAD(&rbd_dev->node);
2367 INIT_LIST_HEAD(&rbd_dev->snaps);
2368 init_rwsem(&rbd_dev->header_rwsem);
2369
2370 init_rwsem(&rbd_dev->header_rwsem);
2371
2372
2373 rbd_id_get(rbd_dev);
2374
2375
2376 BUILD_BUG_ON(DEV_NAME_LEN
2377 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2379
2380
2381 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2382 options, count);
2383 if (rc)
2384 goto err_put_id;
2385
2386 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2387 options);
2388 if (IS_ERR(rbd_dev->rbd_client)) {
2389 rc = PTR_ERR(rbd_dev->rbd_client);
2390 goto err_put_id;
2391 }
2392
2393
2394 osdc = &rbd_dev->rbd_client->client->osdc;
2395 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396 if (rc < 0)
2397 goto err_out_client;
2398 rbd_dev->poolid = rc;
2399
2400
2401 rc = register_blkdev(0, rbd_dev->name);
2402 if (rc < 0)
2403 goto err_out_client;
2404 rbd_dev->major = rc;
2405
2406 rc = rbd_bus_add_dev(rbd_dev);
2407 if (rc)
2408 goto err_out_blkdev;
2409
2410
2411
2412
2413
2414
2415
2416 rc = rbd_init_disk(rbd_dev);
2417 if (rc)
2418 goto err_out_bus;
2419
2420 rc = rbd_init_watch_dev(rbd_dev);
2421 if (rc)
2422 goto err_out_bus;
2423
2424 return count;
2425
2426err_out_bus:
2427
2428
2429 rbd_bus_del_dev(rbd_dev);
2430 kfree(options);
2431 return rc;
2432
2433err_out_blkdev:
2434 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2435err_out_client:
2436 rbd_put_client(rbd_dev);
2437err_put_id:
2438 rbd_id_put(rbd_dev);
2439err_nomem:
2440 kfree(options);
2441 kfree(rbd_dev);
2442
2443 dout("Error adding device %s\n", buf);
2444 module_put(THIS_MODULE);
2445
2446 return (ssize_t) rc;
2447}
2448
2449static struct rbd_device *__rbd_get_dev(unsigned long id)
2450{
2451 struct list_head *tmp;
2452 struct rbd_device *rbd_dev;
2453
2454 spin_lock(&rbd_dev_list_lock);
2455 list_for_each(tmp, &rbd_dev_list) {
2456 rbd_dev = list_entry(tmp, struct rbd_device, node);
2457 if (rbd_dev->id == id) {
2458 spin_unlock(&rbd_dev_list_lock);
2459 return rbd_dev;
2460 }
2461 }
2462 spin_unlock(&rbd_dev_list_lock);
2463 return NULL;
2464}
2465
2466static void rbd_dev_release(struct device *dev)
2467{
2468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470 if (rbd_dev->watch_request) {
2471 struct ceph_client *client = rbd_dev->rbd_client->client;
2472
2473 ceph_osdc_unregister_linger_request(&client->osdc,
2474 rbd_dev->watch_request);
2475 }
2476 if (rbd_dev->watch_event)
2477 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2478
2479 rbd_put_client(rbd_dev);
2480
2481
2482 rbd_free_disk(rbd_dev);
2483 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484
2485
2486 rbd_id_put(rbd_dev);
2487 kfree(rbd_dev);
2488
2489
2490 module_put(THIS_MODULE);
2491}
2492
2493static ssize_t rbd_remove(struct bus_type *bus,
2494 const char *buf,
2495 size_t count)
2496{
2497 struct rbd_device *rbd_dev = NULL;
2498 int target_id, rc;
2499 unsigned long ul;
2500 int ret = count;
2501
2502 rc = strict_strtoul(buf, 10, &ul);
2503 if (rc)
2504 return rc;
2505
2506
2507 target_id = (int) ul;
2508 if (target_id != ul)
2509 return -EINVAL;
2510
2511 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2512
2513 rbd_dev = __rbd_get_dev(target_id);
2514 if (!rbd_dev) {
2515 ret = -ENOENT;
2516 goto done;
2517 }
2518
2519 __rbd_remove_all_snaps(rbd_dev);
2520 rbd_bus_del_dev(rbd_dev);
2521
2522done:
2523 mutex_unlock(&ctl_mutex);
2524 return ret;
2525}
2526
2527static ssize_t rbd_snap_add(struct device *dev,
2528 struct device_attribute *attr,
2529 const char *buf,
2530 size_t count)
2531{
2532 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2533 int ret;
2534 char *name = kmalloc(count + 1, GFP_KERNEL);
2535 if (!name)
2536 return -ENOMEM;
2537
2538 snprintf(name, count, "%s", buf);
2539
2540 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541
2542 ret = rbd_header_add_snap(rbd_dev,
2543 name, GFP_KERNEL);
2544 if (ret < 0)
2545 goto err_unlock;
2546
2547 ret = __rbd_refresh_header(rbd_dev);
2548 if (ret < 0)
2549 goto err_unlock;
2550
2551
2552
2553 mutex_unlock(&ctl_mutex);
2554
2555
2556 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2557
2558 ret = count;
2559 kfree(name);
2560 return ret;
2561
2562err_unlock:
2563 mutex_unlock(&ctl_mutex);
2564 kfree(name);
2565 return ret;
2566}
2567
2568
2569
2570
2571
2572static int rbd_sysfs_init(void)
2573{
2574 int ret;
2575
2576 ret = device_register(&rbd_root_dev);
2577 if (ret < 0)
2578 return ret;
2579
2580 ret = bus_register(&rbd_bus_type);
2581 if (ret < 0)
2582 device_unregister(&rbd_root_dev);
2583
2584 return ret;
2585}
2586
2587static void rbd_sysfs_cleanup(void)
2588{
2589 bus_unregister(&rbd_bus_type);
2590 device_unregister(&rbd_root_dev);
2591}
2592
2593int __init rbd_init(void)
2594{
2595 int rc;
2596
2597 rc = rbd_sysfs_init();
2598 if (rc)
2599 return rc;
2600 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2601 return 0;
2602}
2603
2604void __exit rbd_exit(void)
2605{
2606 rbd_sysfs_cleanup();
2607}
2608
2609module_init(rbd_init);
2610module_exit(rbd_exit);
2611
2612MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2613MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2614MODULE_DESCRIPTION("rados block device");
2615
2616
2617MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2618
2619MODULE_LICENSE("GPL");
2620