1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/decode.h>
36#include <linux/parser.h>
37#include <linux/bsearch.h>
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
42#include <linux/blk-mq.h>
43#include <linux/fs.h>
44#include <linux/blkdev.h>
45#include <linux/slab.h>
46#include <linux/idr.h>
47#include <linux/workqueue.h>
48
49#include "rbd_types.h"
50
51#define RBD_DEBUG
52
53
54
55
56
57
58
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
62
63
64
65
66
67
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
95#define RBD_DRV_NAME "rbd"
96
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
99
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106#define RBD_MAX_SNAP_COUNT 510
107
108#define RBD_SNAP_HEAD_NAME "-"
109
110#define BAD_SNAP_INDEX U32_MAX
111
112
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
114#define RBD_IMAGE_ID_LEN_MAX 64
115
116#define RBD_OBJ_PREFIX_LEN_MAX 64
117
118#define RBD_NOTIFY_TIMEOUT 5
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
121
122
123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1)
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK)
129
130
131
132#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
133
134
135
136
137
138#define DEV_NAME_LEN 32
139
140
141
142
143struct rbd_image_header {
144
145 char *object_prefix;
146 __u8 obj_order;
147 __u8 crypt_type;
148 __u8 comp_type;
149 u64 stripe_unit;
150 u64 stripe_count;
151 u64 features;
152
153
154 u64 image_size;
155 struct ceph_snap_context *snapc;
156 char *snap_names;
157 u64 *snap_sizes;
158};
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185struct rbd_spec {
186 u64 pool_id;
187 const char *pool_name;
188
189 const char *image_id;
190 const char *image_name;
191
192 u64 snap_id;
193 const char *snap_name;
194
195 struct kref kref;
196};
197
198
199
200
201struct rbd_client {
202 struct ceph_client *client;
203 struct kref kref;
204 struct list_head node;
205};
206
207struct rbd_img_request;
208typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
209
210#define BAD_WHICH U32_MAX
211
212struct rbd_obj_request;
213typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
214
215enum obj_request_type {
216 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
217};
218
219enum obj_operation_type {
220 OBJ_OP_WRITE,
221 OBJ_OP_READ,
222 OBJ_OP_DISCARD,
223};
224
225enum obj_req_flags {
226 OBJ_REQ_DONE,
227 OBJ_REQ_IMG_DATA,
228 OBJ_REQ_KNOWN,
229 OBJ_REQ_EXISTS,
230};
231
232struct rbd_obj_request {
233 const char *object_name;
234 u64 offset;
235 u64 length;
236 unsigned long flags;
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 union {
255 struct rbd_obj_request *obj_request;
256 struct {
257 struct rbd_img_request *img_request;
258 u64 img_offset;
259
260 struct list_head links;
261 };
262 };
263 u32 which;
264
265 enum obj_request_type type;
266 union {
267 struct bio *bio_list;
268 struct {
269 struct page **pages;
270 u32 page_count;
271 };
272 };
273 struct page **copyup_pages;
274 u32 copyup_page_count;
275
276 struct ceph_osd_request *osd_req;
277
278 u64 xferred;
279 int result;
280
281 rbd_obj_callback_t callback;
282 struct completion completion;
283
284 struct kref kref;
285};
286
287enum img_req_flags {
288 IMG_REQ_WRITE,
289 IMG_REQ_CHILD,
290 IMG_REQ_LAYERED,
291 IMG_REQ_DISCARD,
292};
293
294struct rbd_img_request {
295 struct rbd_device *rbd_dev;
296 u64 offset;
297 u64 length;
298 unsigned long flags;
299 union {
300 u64 snap_id;
301 struct ceph_snap_context *snapc;
302 };
303 union {
304 struct request *rq;
305 struct rbd_obj_request *obj_request;
306 };
307 struct page **copyup_pages;
308 u32 copyup_page_count;
309 spinlock_t completion_lock;
310 u32 next_completion;
311 rbd_img_callback_t callback;
312 u64 xferred;
313 int result;
314
315 u32 obj_request_count;
316 struct list_head obj_requests;
317
318 struct kref kref;
319};
320
321#define for_each_obj_request(ireq, oreq) \
322 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
323#define for_each_obj_request_from(ireq, oreq) \
324 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
325#define for_each_obj_request_safe(ireq, oreq, n) \
326 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
327
328enum rbd_watch_state {
329 RBD_WATCH_STATE_UNREGISTERED,
330 RBD_WATCH_STATE_REGISTERED,
331 RBD_WATCH_STATE_ERROR,
332};
333
334enum rbd_lock_state {
335 RBD_LOCK_STATE_UNLOCKED,
336 RBD_LOCK_STATE_LOCKED,
337 RBD_LOCK_STATE_RELEASING,
338};
339
340
341struct rbd_client_id {
342 u64 gid;
343 u64 handle;
344};
345
346struct rbd_mapping {
347 u64 size;
348 u64 features;
349 bool read_only;
350};
351
352
353
354
355struct rbd_device {
356 int dev_id;
357
358 int major;
359 int minor;
360 struct gendisk *disk;
361
362 u32 image_format;
363 struct rbd_client *rbd_client;
364
365 char name[DEV_NAME_LEN];
366
367 spinlock_t lock;
368
369 struct rbd_image_header header;
370 unsigned long flags;
371 struct rbd_spec *spec;
372 struct rbd_options *opts;
373 char *config_info;
374
375 struct ceph_object_id header_oid;
376 struct ceph_object_locator header_oloc;
377
378 struct ceph_file_layout layout;
379
380 struct mutex watch_mutex;
381 enum rbd_watch_state watch_state;
382 struct ceph_osd_linger_request *watch_handle;
383 u64 watch_cookie;
384 struct delayed_work watch_dwork;
385
386 struct rw_semaphore lock_rwsem;
387 enum rbd_lock_state lock_state;
388 struct rbd_client_id owner_cid;
389 struct work_struct acquired_lock_work;
390 struct work_struct released_lock_work;
391 struct delayed_work lock_dwork;
392 struct work_struct unlock_work;
393 wait_queue_head_t lock_waitq;
394
395 struct workqueue_struct *task_wq;
396
397 struct rbd_spec *parent_spec;
398 u64 parent_overlap;
399 atomic_t parent_ref;
400 struct rbd_device *parent;
401
402
403 struct blk_mq_tag_set tag_set;
404
405
406 struct rw_semaphore header_rwsem;
407
408 struct rbd_mapping mapping;
409
410 struct list_head node;
411
412
413 struct device dev;
414 unsigned long open_count;
415};
416
417
418
419
420
421
422
423enum rbd_dev_flags {
424 RBD_DEV_FLAG_EXISTS,
425 RBD_DEV_FLAG_REMOVING,
426 RBD_DEV_FLAG_BLACKLISTED,
427};
428
429static DEFINE_MUTEX(client_mutex);
430
431static LIST_HEAD(rbd_dev_list);
432static DEFINE_SPINLOCK(rbd_dev_list_lock);
433
434static LIST_HEAD(rbd_client_list);
435static DEFINE_SPINLOCK(rbd_client_list_lock);
436
437
438
439static struct kmem_cache *rbd_img_request_cache;
440static struct kmem_cache *rbd_obj_request_cache;
441static struct kmem_cache *rbd_segment_name_cache;
442
443static int rbd_major;
444static DEFINE_IDA(rbd_dev_id_ida);
445
446static struct workqueue_struct *rbd_wq;
447
448
449
450
451
452static bool single_major = false;
453module_param(single_major, bool, S_IRUGO);
454MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
455
456static int rbd_img_request_submit(struct rbd_img_request *img_request);
457
458static ssize_t rbd_add(struct bus_type *bus, const char *buf,
459 size_t count);
460static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
461 size_t count);
462static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
463 size_t count);
464static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
465 size_t count);
466static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
467static void rbd_spec_put(struct rbd_spec *spec);
468
469static struct bus_attribute rbd_bus_attrs[] = {
470 __ATTR(add, S_IWUSR, NULL, rbd_add),
471 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
472 __ATTR_NULL
473};
474
475static struct bus_attribute rbd_bus_attrs_single_major[] = {
476 __ATTR(add, S_IWUSR, NULL, rbd_add),
477 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
478 __ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major),
479 __ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major),
480 __ATTR_NULL
481};
482
483static int rbd_dev_id_to_minor(int dev_id)
484{
485 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
486}
487
488static int minor_to_rbd_dev_id(int minor)
489{
490 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
491}
492
493static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
494{
495 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
496 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
497 !rbd_dev->mapping.read_only;
498}
499
500static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
501{
502 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
503 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
504}
505
506static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
507{
508 bool is_lock_owner;
509
510 down_read(&rbd_dev->lock_rwsem);
511 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
512 up_read(&rbd_dev->lock_rwsem);
513 return is_lock_owner;
514}
515
516static struct bus_type rbd_bus_type = {
517 .name = "rbd",
518 .bus_attrs = rbd_bus_attrs,
519};
520
521static void rbd_root_dev_release(struct device *dev)
522{
523}
524
525static struct device rbd_root_dev = {
526 .init_name = "rbd",
527 .release = rbd_root_dev_release,
528};
529
530static __printf(2, 3)
531void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
532{
533 struct va_format vaf;
534 va_list args;
535
536 va_start(args, fmt);
537 vaf.fmt = fmt;
538 vaf.va = &args;
539
540 if (!rbd_dev)
541 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
542 else if (rbd_dev->disk)
543 printk(KERN_WARNING "%s: %s: %pV\n",
544 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
545 else if (rbd_dev->spec && rbd_dev->spec->image_name)
546 printk(KERN_WARNING "%s: image %s: %pV\n",
547 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
548 else if (rbd_dev->spec && rbd_dev->spec->image_id)
549 printk(KERN_WARNING "%s: id %s: %pV\n",
550 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
551 else
552 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
553 RBD_DRV_NAME, rbd_dev, &vaf);
554 va_end(args);
555}
556
557#ifdef RBD_DEBUG
558#define rbd_assert(expr) \
559 if (unlikely(!(expr))) { \
560 printk(KERN_ERR "\nAssertion failure in %s() " \
561 "at line %d:\n\n" \
562 "\trbd_assert(%s);\n\n", \
563 __func__, __LINE__, #expr); \
564 BUG(); \
565 }
566#else
567# define rbd_assert(expr) ((void) 0)
568#endif
569
570static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
571static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
572static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
573static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
574
575static int rbd_dev_refresh(struct rbd_device *rbd_dev);
576static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
577static int rbd_dev_header_info(struct rbd_device *rbd_dev);
578static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
579static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
580 u64 snap_id);
581static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
582 u8 *order, u64 *snap_size);
583static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
584 u64 *snap_features);
585
586static int rbd_open(struct block_device *bdev, fmode_t mode)
587{
588 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
589 bool removing = false;
590
591 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
592 return -EROFS;
593
594 spin_lock_irq(&rbd_dev->lock);
595 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
596 removing = true;
597 else
598 rbd_dev->open_count++;
599 spin_unlock_irq(&rbd_dev->lock);
600 if (removing)
601 return -ENOENT;
602
603 (void) get_device(&rbd_dev->dev);
604
605 return 0;
606}
607
608static void rbd_release(struct gendisk *disk, fmode_t mode)
609{
610 struct rbd_device *rbd_dev = disk->private_data;
611 unsigned long open_count_before;
612
613 spin_lock_irq(&rbd_dev->lock);
614 open_count_before = rbd_dev->open_count--;
615 spin_unlock_irq(&rbd_dev->lock);
616 rbd_assert(open_count_before > 0);
617
618 put_device(&rbd_dev->dev);
619}
620
621static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
622{
623 int ret = 0;
624 int val;
625 bool ro;
626 bool ro_changed = false;
627
628
629 if (get_user(val, (int __user *)(arg)))
630 return -EFAULT;
631
632 ro = val ? true : false;
633
634 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
635 return -EROFS;
636
637 spin_lock_irq(&rbd_dev->lock);
638
639 if (rbd_dev->open_count > 1) {
640 ret = -EBUSY;
641 goto out;
642 }
643
644 if (rbd_dev->mapping.read_only != ro) {
645 rbd_dev->mapping.read_only = ro;
646 ro_changed = true;
647 }
648
649out:
650 spin_unlock_irq(&rbd_dev->lock);
651
652 if (ret == 0 && ro_changed)
653 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
654
655 return ret;
656}
657
658static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
659 unsigned int cmd, unsigned long arg)
660{
661 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
662 int ret = 0;
663
664 switch (cmd) {
665 case BLKROSET:
666 ret = rbd_ioctl_set_ro(rbd_dev, arg);
667 break;
668 default:
669 ret = -ENOTTY;
670 }
671
672 return ret;
673}
674
675#ifdef CONFIG_COMPAT
676static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
677 unsigned int cmd, unsigned long arg)
678{
679 return rbd_ioctl(bdev, mode, cmd, arg);
680}
681#endif
682
683static const struct block_device_operations rbd_bd_ops = {
684 .owner = THIS_MODULE,
685 .open = rbd_open,
686 .release = rbd_release,
687 .ioctl = rbd_ioctl,
688#ifdef CONFIG_COMPAT
689 .compat_ioctl = rbd_compat_ioctl,
690#endif
691};
692
693
694
695
696
697static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
698{
699 struct rbd_client *rbdc;
700 int ret = -ENOMEM;
701
702 dout("%s:\n", __func__);
703 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
704 if (!rbdc)
705 goto out_opt;
706
707 kref_init(&rbdc->kref);
708 INIT_LIST_HEAD(&rbdc->node);
709
710 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
711 if (IS_ERR(rbdc->client))
712 goto out_rbdc;
713 ceph_opts = NULL;
714
715 ret = ceph_open_session(rbdc->client);
716 if (ret < 0)
717 goto out_client;
718
719 spin_lock(&rbd_client_list_lock);
720 list_add_tail(&rbdc->node, &rbd_client_list);
721 spin_unlock(&rbd_client_list_lock);
722
723 dout("%s: rbdc %p\n", __func__, rbdc);
724
725 return rbdc;
726out_client:
727 ceph_destroy_client(rbdc->client);
728out_rbdc:
729 kfree(rbdc);
730out_opt:
731 if (ceph_opts)
732 ceph_destroy_options(ceph_opts);
733 dout("%s: error %d\n", __func__, ret);
734
735 return ERR_PTR(ret);
736}
737
738static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
739{
740 kref_get(&rbdc->kref);
741
742 return rbdc;
743}
744
745
746
747
748
749static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
750{
751 struct rbd_client *client_node;
752 bool found = false;
753
754 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
755 return NULL;
756
757 spin_lock(&rbd_client_list_lock);
758 list_for_each_entry(client_node, &rbd_client_list, node) {
759 if (!ceph_compare_options(ceph_opts, client_node->client)) {
760 __rbd_get_client(client_node);
761
762 found = true;
763 break;
764 }
765 }
766 spin_unlock(&rbd_client_list_lock);
767
768 return found ? client_node : NULL;
769}
770
771
772
773
774enum {
775 Opt_queue_depth,
776 Opt_last_int,
777
778 Opt_last_string,
779
780 Opt_read_only,
781 Opt_read_write,
782 Opt_lock_on_read,
783 Opt_err
784};
785
786static match_table_t rbd_opts_tokens = {
787 {Opt_queue_depth, "queue_depth=%d"},
788
789
790 {Opt_read_only, "read_only"},
791 {Opt_read_only, "ro"},
792 {Opt_read_write, "read_write"},
793 {Opt_read_write, "rw"},
794 {Opt_lock_on_read, "lock_on_read"},
795 {Opt_err, NULL}
796};
797
798struct rbd_options {
799 int queue_depth;
800 bool read_only;
801 bool lock_on_read;
802};
803
804#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
805#define RBD_READ_ONLY_DEFAULT false
806#define RBD_LOCK_ON_READ_DEFAULT false
807
808static int parse_rbd_opts_token(char *c, void *private)
809{
810 struct rbd_options *rbd_opts = private;
811 substring_t argstr[MAX_OPT_ARGS];
812 int token, intval, ret;
813
814 token = match_token(c, rbd_opts_tokens, argstr);
815 if (token < Opt_last_int) {
816 ret = match_int(&argstr[0], &intval);
817 if (ret < 0) {
818 pr_err("bad mount option arg (not int) at '%s'\n", c);
819 return ret;
820 }
821 dout("got int token %d val %d\n", token, intval);
822 } else if (token > Opt_last_int && token < Opt_last_string) {
823 dout("got string token %d val %s\n", token, argstr[0].from);
824 } else {
825 dout("got token %d\n", token);
826 }
827
828 switch (token) {
829 case Opt_queue_depth:
830 if (intval < 1) {
831 pr_err("queue_depth out of range\n");
832 return -EINVAL;
833 }
834 rbd_opts->queue_depth = intval;
835 break;
836 case Opt_read_only:
837 rbd_opts->read_only = true;
838 break;
839 case Opt_read_write:
840 rbd_opts->read_only = false;
841 break;
842 case Opt_lock_on_read:
843 rbd_opts->lock_on_read = true;
844 break;
845 default:
846
847 return -EINVAL;
848 }
849
850 return 0;
851}
852
853static char* obj_op_name(enum obj_operation_type op_type)
854{
855 switch (op_type) {
856 case OBJ_OP_READ:
857 return "read";
858 case OBJ_OP_WRITE:
859 return "write";
860 case OBJ_OP_DISCARD:
861 return "discard";
862 default:
863 return "???";
864 }
865}
866
867
868
869
870
871
872static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
873{
874 struct rbd_client *rbdc;
875
876 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
877 rbdc = rbd_client_find(ceph_opts);
878 if (rbdc)
879 ceph_destroy_options(ceph_opts);
880 else
881 rbdc = rbd_client_create(ceph_opts);
882 mutex_unlock(&client_mutex);
883
884 return rbdc;
885}
886
887
888
889
890
891
892static void rbd_client_release(struct kref *kref)
893{
894 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
895
896 dout("%s: rbdc %p\n", __func__, rbdc);
897 spin_lock(&rbd_client_list_lock);
898 list_del(&rbdc->node);
899 spin_unlock(&rbd_client_list_lock);
900
901 ceph_destroy_client(rbdc->client);
902 kfree(rbdc);
903}
904
905
906
907
908
909static void rbd_put_client(struct rbd_client *rbdc)
910{
911 if (rbdc)
912 kref_put(&rbdc->kref, rbd_client_release);
913}
914
915static bool rbd_image_format_valid(u32 image_format)
916{
917 return image_format == 1 || image_format == 2;
918}
919
920static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
921{
922 size_t size;
923 u32 snap_count;
924
925
926 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
927 return false;
928
929
930
931 if (ondisk->options.order < SECTOR_SHIFT)
932 return false;
933
934
935
936 if (ondisk->options.order > 8 * sizeof (int) - 1)
937 return false;
938
939
940
941
942
943 snap_count = le32_to_cpu(ondisk->snap_count);
944 size = SIZE_MAX - sizeof (struct ceph_snap_context);
945 if (snap_count > size / sizeof (__le64))
946 return false;
947
948
949
950
951
952 size -= snap_count * sizeof (__le64);
953 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
954 return false;
955
956 return true;
957}
958
959
960
961
962
963static int rbd_header_from_disk(struct rbd_device *rbd_dev,
964 struct rbd_image_header_ondisk *ondisk)
965{
966 struct rbd_image_header *header = &rbd_dev->header;
967 bool first_time = header->object_prefix == NULL;
968 struct ceph_snap_context *snapc;
969 char *object_prefix = NULL;
970 char *snap_names = NULL;
971 u64 *snap_sizes = NULL;
972 u32 snap_count;
973 size_t size;
974 int ret = -ENOMEM;
975 u32 i;
976
977
978
979 if (first_time) {
980 size_t len;
981
982 len = strnlen(ondisk->object_prefix,
983 sizeof (ondisk->object_prefix));
984 object_prefix = kmalloc(len + 1, GFP_KERNEL);
985 if (!object_prefix)
986 return -ENOMEM;
987 memcpy(object_prefix, ondisk->object_prefix, len);
988 object_prefix[len] = '\0';
989 }
990
991
992
993 snap_count = le32_to_cpu(ondisk->snap_count);
994 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
995 if (!snapc)
996 goto out_err;
997 snapc->seq = le64_to_cpu(ondisk->snap_seq);
998 if (snap_count) {
999 struct rbd_image_snap_ondisk *snaps;
1000 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1001
1002
1003
1004 if (snap_names_len > (u64)SIZE_MAX)
1005 goto out_2big;
1006 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1007 if (!snap_names)
1008 goto out_err;
1009
1010
1011
1012 size = snap_count * sizeof (*header->snap_sizes);
1013 snap_sizes = kmalloc(size, GFP_KERNEL);
1014 if (!snap_sizes)
1015 goto out_err;
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1027 snaps = ondisk->snaps;
1028 for (i = 0; i < snap_count; i++) {
1029 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1030 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1031 }
1032 }
1033
1034
1035
1036 if (first_time) {
1037 header->object_prefix = object_prefix;
1038 header->obj_order = ondisk->options.order;
1039 header->crypt_type = ondisk->options.crypt_type;
1040 header->comp_type = ondisk->options.comp_type;
1041
1042 header->stripe_unit = 0;
1043 header->stripe_count = 0;
1044 header->features = 0;
1045 } else {
1046 ceph_put_snap_context(header->snapc);
1047 kfree(header->snap_names);
1048 kfree(header->snap_sizes);
1049 }
1050
1051
1052
1053 header->image_size = le64_to_cpu(ondisk->image_size);
1054 header->snapc = snapc;
1055 header->snap_names = snap_names;
1056 header->snap_sizes = snap_sizes;
1057
1058 return 0;
1059out_2big:
1060 ret = -EIO;
1061out_err:
1062 kfree(snap_sizes);
1063 kfree(snap_names);
1064 ceph_put_snap_context(snapc);
1065 kfree(object_prefix);
1066
1067 return ret;
1068}
1069
1070static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1071{
1072 const char *snap_name;
1073
1074 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1075
1076
1077
1078 snap_name = rbd_dev->header.snap_names;
1079 while (which--)
1080 snap_name += strlen(snap_name) + 1;
1081
1082 return kstrdup(snap_name, GFP_KERNEL);
1083}
1084
1085
1086
1087
1088
1089static int snapid_compare_reverse(const void *s1, const void *s2)
1090{
1091 u64 snap_id1 = *(u64 *)s1;
1092 u64 snap_id2 = *(u64 *)s2;
1093
1094 if (snap_id1 < snap_id2)
1095 return 1;
1096 return snap_id1 == snap_id2 ? 0 : -1;
1097}
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1110{
1111 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1112 u64 *found;
1113
1114 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1115 sizeof (snap_id), snapid_compare_reverse);
1116
1117 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1118}
1119
1120static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1121 u64 snap_id)
1122{
1123 u32 which;
1124 const char *snap_name;
1125
1126 which = rbd_dev_snap_index(rbd_dev, snap_id);
1127 if (which == BAD_SNAP_INDEX)
1128 return ERR_PTR(-ENOENT);
1129
1130 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1131 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1132}
1133
1134static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1135{
1136 if (snap_id == CEPH_NOSNAP)
1137 return RBD_SNAP_HEAD_NAME;
1138
1139 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1140 if (rbd_dev->image_format == 1)
1141 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1142
1143 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1144}
1145
1146static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1147 u64 *snap_size)
1148{
1149 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1150 if (snap_id == CEPH_NOSNAP) {
1151 *snap_size = rbd_dev->header.image_size;
1152 } else if (rbd_dev->image_format == 1) {
1153 u32 which;
1154
1155 which = rbd_dev_snap_index(rbd_dev, snap_id);
1156 if (which == BAD_SNAP_INDEX)
1157 return -ENOENT;
1158
1159 *snap_size = rbd_dev->header.snap_sizes[which];
1160 } else {
1161 u64 size = 0;
1162 int ret;
1163
1164 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1165 if (ret)
1166 return ret;
1167
1168 *snap_size = size;
1169 }
1170 return 0;
1171}
1172
1173static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1174 u64 *snap_features)
1175{
1176 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1177 if (snap_id == CEPH_NOSNAP) {
1178 *snap_features = rbd_dev->header.features;
1179 } else if (rbd_dev->image_format == 1) {
1180 *snap_features = 0;
1181 } else {
1182 u64 features = 0;
1183 int ret;
1184
1185 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1186 if (ret)
1187 return ret;
1188
1189 *snap_features = features;
1190 }
1191 return 0;
1192}
1193
1194static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1195{
1196 u64 snap_id = rbd_dev->spec->snap_id;
1197 u64 size = 0;
1198 u64 features = 0;
1199 int ret;
1200
1201 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1202 if (ret)
1203 return ret;
1204 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1205 if (ret)
1206 return ret;
1207
1208 rbd_dev->mapping.size = size;
1209 rbd_dev->mapping.features = features;
1210
1211 return 0;
1212}
1213
1214static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1215{
1216 rbd_dev->mapping.size = 0;
1217 rbd_dev->mapping.features = 0;
1218}
1219
1220static void rbd_segment_name_free(const char *name)
1221{
1222
1223
1224 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1225}
1226
1227static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1228{
1229 char *name;
1230 u64 segment;
1231 int ret;
1232 char *name_format;
1233
1234 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1235 if (!name)
1236 return NULL;
1237 segment = offset >> rbd_dev->header.obj_order;
1238 name_format = "%s.%012llx";
1239 if (rbd_dev->image_format == 2)
1240 name_format = "%s.%016llx";
1241 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1242 rbd_dev->header.object_prefix, segment);
1243 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1244 pr_err("error formatting segment name for #%llu (%d)\n",
1245 segment, ret);
1246 rbd_segment_name_free(name);
1247 name = NULL;
1248 }
1249
1250 return name;
1251}
1252
1253static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1254{
1255 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1256
1257 return offset & (segment_size - 1);
1258}
1259
1260static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1261 u64 offset, u64 length)
1262{
1263 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1264
1265 offset &= segment_size - 1;
1266
1267 rbd_assert(length <= U64_MAX - offset);
1268 if (offset + length > segment_size)
1269 length = segment_size - offset;
1270
1271 return length;
1272}
1273
1274
1275
1276
1277static u64 rbd_obj_bytes(struct rbd_image_header *header)
1278{
1279 return 1 << header->obj_order;
1280}
1281
1282
1283
1284
1285
1286static void bio_chain_put(struct bio *chain)
1287{
1288 struct bio *tmp;
1289
1290 while (chain) {
1291 tmp = chain;
1292 chain = chain->bi_next;
1293 bio_put(tmp);
1294 }
1295}
1296
1297
1298
1299
1300static void zero_bio_chain(struct bio *chain, int start_ofs)
1301{
1302 struct bio_vec *bv;
1303 unsigned long flags;
1304 void *buf;
1305 int i;
1306 int pos = 0;
1307
1308 while (chain) {
1309 bio_for_each_segment(bv, chain, i) {
1310 if (pos + bv->bv_len > start_ofs) {
1311 int remainder = max(start_ofs - pos, 0);
1312 buf = bvec_kmap_irq(bv, &flags);
1313 memset(buf + remainder, 0,
1314 bv->bv_len - remainder);
1315 flush_dcache_page(bv->bv_page);
1316 bvec_kunmap_irq(buf, &flags);
1317 }
1318 pos += bv->bv_len;
1319 }
1320
1321 chain = chain->bi_next;
1322 }
1323}
1324
1325
1326
1327
1328
1329
1330
1331static void zero_pages(struct page **pages, u64 offset, u64 end)
1332{
1333 struct page **page = &pages[offset >> PAGE_SHIFT];
1334
1335 rbd_assert(end > offset);
1336 rbd_assert(end - offset <= (u64)SIZE_MAX);
1337 while (offset < end) {
1338 size_t page_offset;
1339 size_t length;
1340 unsigned long flags;
1341 void *kaddr;
1342
1343 page_offset = offset & ~PAGE_MASK;
1344 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345 local_irq_save(flags);
1346 kaddr = kmap_atomic(*page);
1347 memset(kaddr + page_offset, 0, length);
1348 flush_dcache_page(*page);
1349 kunmap_atomic(kaddr);
1350 local_irq_restore(flags);
1351
1352 offset += length;
1353 page++;
1354 }
1355}
1356
1357
1358
1359
1360
1361static struct bio *bio_clone_range(struct bio *bio_src,
1362 unsigned int offset,
1363 unsigned int len,
1364 gfp_t gfpmask)
1365{
1366 struct bio_vec *bv;
1367 unsigned int resid;
1368 unsigned short idx;
1369 unsigned int voff;
1370 unsigned short end_idx;
1371 unsigned short vcnt;
1372 struct bio *bio;
1373
1374
1375
1376 if (!offset && len == bio_src->bi_size)
1377 return bio_clone(bio_src, gfpmask);
1378
1379 if (WARN_ON_ONCE(!len))
1380 return NULL;
1381 if (WARN_ON_ONCE(len > bio_src->bi_size))
1382 return NULL;
1383 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1384 return NULL;
1385
1386
1387
1388 resid = offset;
1389 bio_for_each_segment(bv, bio_src, idx) {
1390 if (resid < bv->bv_len)
1391 break;
1392 resid -= bv->bv_len;
1393 }
1394 voff = resid;
1395
1396
1397
1398 resid += len;
1399 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1400 if (resid <= bv->bv_len)
1401 break;
1402 resid -= bv->bv_len;
1403 }
1404 vcnt = end_idx - idx + 1;
1405
1406
1407
1408 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1409 if (!bio)
1410 return NULL;
1411
1412 bio->bi_bdev = bio_src->bi_bdev;
1413 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1414 bio->bi_rw = bio_src->bi_rw;
1415 bio->bi_flags |= 1 << BIO_CLONED;
1416
1417
1418
1419
1420
1421 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1422 vcnt * sizeof (struct bio_vec));
1423 bio->bi_io_vec[0].bv_offset += voff;
1424 if (vcnt > 1) {
1425 bio->bi_io_vec[0].bv_len -= voff;
1426 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1427 } else {
1428 bio->bi_io_vec[0].bv_len = len;
1429 }
1430
1431 bio->bi_vcnt = vcnt;
1432 bio->bi_size = len;
1433 bio->bi_idx = 0;
1434
1435 return bio;
1436}
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452static struct bio *bio_chain_clone_range(struct bio **bio_src,
1453 unsigned int *offset,
1454 unsigned int len,
1455 gfp_t gfpmask)
1456{
1457 struct bio *bi = *bio_src;
1458 unsigned int off = *offset;
1459 struct bio *chain = NULL;
1460 struct bio **end;
1461
1462
1463
1464 if (!bi || off >= bi->bi_size || !len)
1465 return NULL;
1466
1467 end = &chain;
1468 while (len) {
1469 unsigned int bi_size;
1470 struct bio *bio;
1471
1472 if (!bi) {
1473 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1474 goto out_err;
1475 }
1476 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1477 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1478 if (!bio)
1479 goto out_err;
1480
1481 *end = bio;
1482 end = &bio->bi_next;
1483
1484 off += bi_size;
1485 if (off == bi->bi_size) {
1486 bi = bi->bi_next;
1487 off = 0;
1488 }
1489 len -= bi_size;
1490 }
1491 *bio_src = bi;
1492 *offset = off;
1493
1494 return chain;
1495out_err:
1496 bio_chain_put(chain);
1497
1498 return NULL;
1499}
1500
1501
1502
1503
1504
1505
1506static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1507{
1508 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1509 struct rbd_device *rbd_dev;
1510
1511 rbd_dev = obj_request->img_request->rbd_dev;
1512 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1513 obj_request);
1514 }
1515}
1516
1517static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1518{
1519 smp_mb();
1520 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1521}
1522
1523static void obj_request_done_set(struct rbd_obj_request *obj_request)
1524{
1525 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1526 struct rbd_device *rbd_dev = NULL;
1527
1528 if (obj_request_img_data_test(obj_request))
1529 rbd_dev = obj_request->img_request->rbd_dev;
1530 rbd_warn(rbd_dev, "obj_request %p already marked done",
1531 obj_request);
1532 }
1533}
1534
1535static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1536{
1537 smp_mb();
1538 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1539}
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1552 bool exists)
1553{
1554 if (exists)
1555 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1556 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1557 smp_mb();
1558}
1559
1560static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1561{
1562 smp_mb();
1563 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1564}
1565
1566static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1567{
1568 smp_mb();
1569 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1570}
1571
1572static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1573{
1574 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1575
1576 return obj_request->img_offset <
1577 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1578}
1579
1580static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1581{
1582 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1583 atomic_read(&obj_request->kref.refcount));
1584 kref_get(&obj_request->kref);
1585}
1586
1587static void rbd_obj_request_destroy(struct kref *kref);
1588static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1589{
1590 rbd_assert(obj_request != NULL);
1591 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1592 atomic_read(&obj_request->kref.refcount));
1593 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1594}
1595
1596static void rbd_img_request_get(struct rbd_img_request *img_request)
1597{
1598 dout("%s: img %p (was %d)\n", __func__, img_request,
1599 atomic_read(&img_request->kref.refcount));
1600 kref_get(&img_request->kref);
1601}
1602
1603static bool img_request_child_test(struct rbd_img_request *img_request);
1604static void rbd_parent_request_destroy(struct kref *kref);
1605static void rbd_img_request_destroy(struct kref *kref);
1606static void rbd_img_request_put(struct rbd_img_request *img_request)
1607{
1608 rbd_assert(img_request != NULL);
1609 dout("%s: img %p (was %d)\n", __func__, img_request,
1610 atomic_read(&img_request->kref.refcount));
1611 if (img_request_child_test(img_request))
1612 kref_put(&img_request->kref, rbd_parent_request_destroy);
1613 else
1614 kref_put(&img_request->kref, rbd_img_request_destroy);
1615}
1616
1617static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1618 struct rbd_obj_request *obj_request)
1619{
1620 rbd_assert(obj_request->img_request == NULL);
1621
1622
1623 obj_request->img_request = img_request;
1624 obj_request->which = img_request->obj_request_count;
1625 rbd_assert(!obj_request_img_data_test(obj_request));
1626 obj_request_img_data_set(obj_request);
1627 rbd_assert(obj_request->which != BAD_WHICH);
1628 img_request->obj_request_count++;
1629 list_add_tail(&obj_request->links, &img_request->obj_requests);
1630 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1631 obj_request->which);
1632}
1633
1634static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1635 struct rbd_obj_request *obj_request)
1636{
1637 rbd_assert(obj_request->which != BAD_WHICH);
1638
1639 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1640 obj_request->which);
1641 list_del(&obj_request->links);
1642 rbd_assert(img_request->obj_request_count > 0);
1643 img_request->obj_request_count--;
1644 rbd_assert(obj_request->which == img_request->obj_request_count);
1645 obj_request->which = BAD_WHICH;
1646 rbd_assert(obj_request_img_data_test(obj_request));
1647 rbd_assert(obj_request->img_request == img_request);
1648 obj_request->img_request = NULL;
1649 obj_request->callback = NULL;
1650 rbd_obj_request_put(obj_request);
1651}
1652
1653static bool obj_request_type_valid(enum obj_request_type type)
1654{
1655 switch (type) {
1656 case OBJ_REQUEST_NODATA:
1657 case OBJ_REQUEST_BIO:
1658 case OBJ_REQUEST_PAGES:
1659 return true;
1660 default:
1661 return false;
1662 }
1663}
1664
1665static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1666 struct rbd_obj_request *obj_request)
1667{
1668 dout("%s %p\n", __func__, obj_request);
1669 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1670}
1671
1672static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1673{
1674 dout("%s %p\n", __func__, obj_request);
1675 ceph_osdc_cancel_request(obj_request->osd_req);
1676}
1677
1678
1679
1680
1681
1682
1683
1684static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1685 unsigned long timeout)
1686{
1687 long ret;
1688
1689 dout("%s %p\n", __func__, obj_request);
1690 ret = wait_for_completion_interruptible_timeout(
1691 &obj_request->completion,
1692 ceph_timeout_jiffies(timeout));
1693 if (ret <= 0) {
1694 if (ret == 0)
1695 ret = -ETIMEDOUT;
1696 rbd_obj_request_end(obj_request);
1697 } else {
1698 ret = 0;
1699 }
1700
1701 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1702 return ret;
1703}
1704
1705static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1706{
1707 return __rbd_obj_request_wait(obj_request, 0);
1708}
1709
1710static void rbd_img_request_complete(struct rbd_img_request *img_request)
1711{
1712
1713 dout("%s: img %p\n", __func__, img_request);
1714
1715
1716
1717
1718
1719
1720
1721 if (!img_request->result) {
1722 struct rbd_obj_request *obj_request;
1723 u64 xferred = 0;
1724
1725 for_each_obj_request(img_request, obj_request)
1726 xferred += obj_request->xferred;
1727 img_request->xferred = xferred;
1728 }
1729
1730 if (img_request->callback)
1731 img_request->callback(img_request);
1732 else
1733 rbd_img_request_put(img_request);
1734}
1735
1736
1737
1738
1739
1740
1741static void img_request_write_set(struct rbd_img_request *img_request)
1742{
1743 set_bit(IMG_REQ_WRITE, &img_request->flags);
1744 smp_mb();
1745}
1746
1747static bool img_request_write_test(struct rbd_img_request *img_request)
1748{
1749 smp_mb();
1750 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1751}
1752
1753
1754
1755
1756static void img_request_discard_set(struct rbd_img_request *img_request)
1757{
1758 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1759 smp_mb();
1760}
1761
1762static bool img_request_discard_test(struct rbd_img_request *img_request)
1763{
1764 smp_mb();
1765 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1766}
1767
1768static void img_request_child_set(struct rbd_img_request *img_request)
1769{
1770 set_bit(IMG_REQ_CHILD, &img_request->flags);
1771 smp_mb();
1772}
1773
1774static void img_request_child_clear(struct rbd_img_request *img_request)
1775{
1776 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1777 smp_mb();
1778}
1779
1780static bool img_request_child_test(struct rbd_img_request *img_request)
1781{
1782 smp_mb();
1783 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1784}
1785
1786static void img_request_layered_set(struct rbd_img_request *img_request)
1787{
1788 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1789 smp_mb();
1790}
1791
1792static void img_request_layered_clear(struct rbd_img_request *img_request)
1793{
1794 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1795 smp_mb();
1796}
1797
1798static bool img_request_layered_test(struct rbd_img_request *img_request)
1799{
1800 smp_mb();
1801 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1802}
1803
1804static enum obj_operation_type
1805rbd_img_request_op_type(struct rbd_img_request *img_request)
1806{
1807 if (img_request_write_test(img_request))
1808 return OBJ_OP_WRITE;
1809 else if (img_request_discard_test(img_request))
1810 return OBJ_OP_DISCARD;
1811 else
1812 return OBJ_OP_READ;
1813}
1814
1815static void
1816rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1817{
1818 u64 xferred = obj_request->xferred;
1819 u64 length = obj_request->length;
1820
1821 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1822 obj_request, obj_request->img_request, obj_request->result,
1823 xferred, length);
1824
1825
1826
1827
1828
1829
1830
1831
1832 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1833 if (obj_request->result == -ENOENT) {
1834 if (obj_request->type == OBJ_REQUEST_BIO)
1835 zero_bio_chain(obj_request->bio_list, 0);
1836 else
1837 zero_pages(obj_request->pages, 0, length);
1838 obj_request->result = 0;
1839 } else if (xferred < length && !obj_request->result) {
1840 if (obj_request->type == OBJ_REQUEST_BIO)
1841 zero_bio_chain(obj_request->bio_list, xferred);
1842 else
1843 zero_pages(obj_request->pages, xferred, length);
1844 }
1845 obj_request->xferred = length;
1846 obj_request_done_set(obj_request);
1847}
1848
1849static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1850{
1851 dout("%s: obj %p cb %p\n", __func__, obj_request,
1852 obj_request->callback);
1853 if (obj_request->callback)
1854 obj_request->callback(obj_request);
1855 else
1856 complete_all(&obj_request->completion);
1857}
1858
1859static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1860{
1861 struct rbd_img_request *img_request = NULL;
1862 struct rbd_device *rbd_dev = NULL;
1863 bool layered = false;
1864
1865 if (obj_request_img_data_test(obj_request)) {
1866 img_request = obj_request->img_request;
1867 layered = img_request && img_request_layered_test(img_request);
1868 rbd_dev = img_request->rbd_dev;
1869 }
1870
1871 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1872 obj_request, img_request, obj_request->result,
1873 obj_request->xferred, obj_request->length);
1874 if (layered && obj_request->result == -ENOENT &&
1875 obj_request->img_offset < rbd_dev->parent_overlap)
1876 rbd_img_parent_read(obj_request);
1877 else if (img_request)
1878 rbd_img_obj_request_read_callback(obj_request);
1879 else
1880 obj_request_done_set(obj_request);
1881}
1882
1883static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1884{
1885 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1886 obj_request->result, obj_request->length);
1887
1888
1889
1890
1891 obj_request->xferred = obj_request->length;
1892 obj_request_done_set(obj_request);
1893}
1894
1895static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1896{
1897 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1898 obj_request->result, obj_request->length);
1899
1900
1901
1902
1903 obj_request->xferred = obj_request->length;
1904
1905 if (obj_request->result == -ENOENT)
1906 obj_request->result = 0;
1907 obj_request_done_set(obj_request);
1908}
1909
1910
1911
1912
1913
1914static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1915{
1916 dout("%s: obj %p\n", __func__, obj_request);
1917 obj_request_done_set(obj_request);
1918}
1919
1920static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1921{
1922 dout("%s: obj %p\n", __func__, obj_request);
1923
1924 if (obj_request_img_data_test(obj_request))
1925 rbd_osd_copyup_callback(obj_request);
1926 else
1927 obj_request_done_set(obj_request);
1928}
1929
1930static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1931{
1932 struct rbd_obj_request *obj_request = osd_req->r_priv;
1933 u16 opcode;
1934
1935 dout("%s: osd_req %p\n", __func__, osd_req);
1936 rbd_assert(osd_req == obj_request->osd_req);
1937 if (obj_request_img_data_test(obj_request)) {
1938 rbd_assert(obj_request->img_request);
1939 rbd_assert(obj_request->which != BAD_WHICH);
1940 } else {
1941 rbd_assert(obj_request->which == BAD_WHICH);
1942 }
1943
1944 if (osd_req->r_result < 0)
1945 obj_request->result = osd_req->r_result;
1946
1947
1948
1949
1950
1951
1952 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1953 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1954
1955 opcode = osd_req->r_ops[0].op;
1956 switch (opcode) {
1957 case CEPH_OSD_OP_READ:
1958 rbd_osd_read_callback(obj_request);
1959 break;
1960 case CEPH_OSD_OP_SETALLOCHINT:
1961 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1962 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1963
1964 case CEPH_OSD_OP_WRITE:
1965 case CEPH_OSD_OP_WRITEFULL:
1966 rbd_osd_write_callback(obj_request);
1967 break;
1968 case CEPH_OSD_OP_STAT:
1969 rbd_osd_stat_callback(obj_request);
1970 break;
1971 case CEPH_OSD_OP_DELETE:
1972 case CEPH_OSD_OP_TRUNCATE:
1973 case CEPH_OSD_OP_ZERO:
1974 rbd_osd_discard_callback(obj_request);
1975 break;
1976 case CEPH_OSD_OP_CALL:
1977 rbd_osd_call_callback(obj_request);
1978 break;
1979 default:
1980 rbd_warn(NULL, "%s: unsupported op %hu",
1981 obj_request->object_name, (unsigned short) opcode);
1982 break;
1983 }
1984
1985 if (obj_request_done_test(obj_request))
1986 rbd_obj_request_complete(obj_request);
1987}
1988
1989static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1990{
1991 struct rbd_img_request *img_request = obj_request->img_request;
1992 struct ceph_osd_request *osd_req = obj_request->osd_req;
1993
1994 if (img_request)
1995 osd_req->r_snapid = img_request->snap_id;
1996}
1997
1998static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1999{
2000 struct ceph_osd_request *osd_req = obj_request->osd_req;
2001
2002 osd_req->r_mtime = CURRENT_TIME;
2003 osd_req->r_data_offset = obj_request->offset;
2004}
2005
2006
2007
2008
2009
2010
2011
2012static struct ceph_osd_request *rbd_osd_req_create(
2013 struct rbd_device *rbd_dev,
2014 enum obj_operation_type op_type,
2015 unsigned int num_ops,
2016 struct rbd_obj_request *obj_request)
2017{
2018 struct ceph_snap_context *snapc = NULL;
2019 struct ceph_osd_client *osdc;
2020 struct ceph_osd_request *osd_req;
2021
2022 if (obj_request_img_data_test(obj_request) &&
2023 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
2024 struct rbd_img_request *img_request = obj_request->img_request;
2025 if (op_type == OBJ_OP_WRITE) {
2026 rbd_assert(img_request_write_test(img_request));
2027 } else {
2028 rbd_assert(img_request_discard_test(img_request));
2029 }
2030 snapc = img_request->snapc;
2031 }
2032
2033 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
2034
2035
2036
2037 osdc = &rbd_dev->rbd_client->client->osdc;
2038 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2039 GFP_NOIO);
2040 if (!osd_req)
2041 goto fail;
2042
2043 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2044 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2045 else
2046 osd_req->r_flags = CEPH_OSD_FLAG_READ;
2047
2048 osd_req->r_callback = rbd_osd_req_callback;
2049 osd_req->r_priv = obj_request;
2050
2051 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
2052 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2053 obj_request->object_name))
2054 goto fail;
2055
2056 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2057 goto fail;
2058
2059 return osd_req;
2060
2061fail:
2062 ceph_osdc_put_request(osd_req);
2063 return NULL;
2064}
2065
2066
2067
2068
2069
2070
2071
2072static struct ceph_osd_request *
2073rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2074{
2075 struct rbd_img_request *img_request;
2076 struct ceph_snap_context *snapc;
2077 struct rbd_device *rbd_dev;
2078 struct ceph_osd_client *osdc;
2079 struct ceph_osd_request *osd_req;
2080 int num_osd_ops = 3;
2081
2082 rbd_assert(obj_request_img_data_test(obj_request));
2083 img_request = obj_request->img_request;
2084 rbd_assert(img_request);
2085 rbd_assert(img_request_write_test(img_request) ||
2086 img_request_discard_test(img_request));
2087
2088 if (img_request_discard_test(img_request))
2089 num_osd_ops = 2;
2090
2091
2092
2093 snapc = img_request->snapc;
2094 rbd_dev = img_request->rbd_dev;
2095 osdc = &rbd_dev->rbd_client->client->osdc;
2096 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2097 false, GFP_NOIO);
2098 if (!osd_req)
2099 goto fail;
2100
2101 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2102 osd_req->r_callback = rbd_osd_req_callback;
2103 osd_req->r_priv = obj_request;
2104
2105 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
2106 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2107 obj_request->object_name))
2108 goto fail;
2109
2110 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2111 goto fail;
2112
2113 return osd_req;
2114
2115fail:
2116 ceph_osdc_put_request(osd_req);
2117 return NULL;
2118}
2119
2120
2121static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2122{
2123 ceph_osdc_put_request(osd_req);
2124}
2125
2126
2127
2128static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2129 u64 offset, u64 length,
2130 enum obj_request_type type)
2131{
2132 struct rbd_obj_request *obj_request;
2133 size_t size;
2134 char *name;
2135
2136 rbd_assert(obj_request_type_valid(type));
2137
2138 size = strlen(object_name) + 1;
2139 name = kmalloc(size, GFP_NOIO);
2140 if (!name)
2141 return NULL;
2142
2143 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2144 if (!obj_request) {
2145 kfree(name);
2146 return NULL;
2147 }
2148
2149 obj_request->object_name = memcpy(name, object_name, size);
2150 obj_request->offset = offset;
2151 obj_request->length = length;
2152 obj_request->flags = 0;
2153 obj_request->which = BAD_WHICH;
2154 obj_request->type = type;
2155 INIT_LIST_HEAD(&obj_request->links);
2156 init_completion(&obj_request->completion);
2157 kref_init(&obj_request->kref);
2158
2159 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2160 offset, length, (int)type, obj_request);
2161
2162 return obj_request;
2163}
2164
2165static void rbd_obj_request_destroy(struct kref *kref)
2166{
2167 struct rbd_obj_request *obj_request;
2168
2169 obj_request = container_of(kref, struct rbd_obj_request, kref);
2170
2171 dout("%s: obj %p\n", __func__, obj_request);
2172
2173 rbd_assert(obj_request->img_request == NULL);
2174 rbd_assert(obj_request->which == BAD_WHICH);
2175
2176 if (obj_request->osd_req)
2177 rbd_osd_req_destroy(obj_request->osd_req);
2178
2179 rbd_assert(obj_request_type_valid(obj_request->type));
2180 switch (obj_request->type) {
2181 case OBJ_REQUEST_NODATA:
2182 break;
2183 case OBJ_REQUEST_BIO:
2184 if (obj_request->bio_list)
2185 bio_chain_put(obj_request->bio_list);
2186 break;
2187 case OBJ_REQUEST_PAGES:
2188 if (obj_request->pages)
2189 ceph_release_page_vector(obj_request->pages,
2190 obj_request->page_count);
2191 break;
2192 }
2193
2194 kfree(obj_request->object_name);
2195 obj_request->object_name = NULL;
2196 kmem_cache_free(rbd_obj_request_cache, obj_request);
2197}
2198
2199
2200
2201static void rbd_spec_put(struct rbd_spec *spec);
2202static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2203{
2204 rbd_dev_remove_parent(rbd_dev);
2205 rbd_spec_put(rbd_dev->parent_spec);
2206 rbd_dev->parent_spec = NULL;
2207 rbd_dev->parent_overlap = 0;
2208}
2209
2210
2211
2212
2213
2214
2215
2216static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2217{
2218 int counter;
2219
2220 if (!rbd_dev->parent_spec)
2221 return;
2222
2223 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2224 if (counter > 0)
2225 return;
2226
2227
2228
2229 if (!counter)
2230 rbd_dev_unparent(rbd_dev);
2231 else
2232 rbd_warn(rbd_dev, "parent reference underflow");
2233}
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2244{
2245 int counter = 0;
2246
2247 if (!rbd_dev->parent_spec)
2248 return false;
2249
2250 down_read(&rbd_dev->header_rwsem);
2251 if (rbd_dev->parent_overlap)
2252 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2253 up_read(&rbd_dev->header_rwsem);
2254
2255 if (counter < 0)
2256 rbd_warn(rbd_dev, "parent reference overflow");
2257
2258 return counter > 0;
2259}
2260
2261
2262
2263
2264
2265
2266static struct rbd_img_request *rbd_img_request_create(
2267 struct rbd_device *rbd_dev,
2268 u64 offset, u64 length,
2269 enum obj_operation_type op_type,
2270 struct ceph_snap_context *snapc)
2271{
2272 struct rbd_img_request *img_request;
2273
2274 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2275 if (!img_request)
2276 return NULL;
2277
2278 img_request->rq = NULL;
2279 img_request->rbd_dev = rbd_dev;
2280 img_request->offset = offset;
2281 img_request->length = length;
2282 img_request->flags = 0;
2283 if (op_type == OBJ_OP_DISCARD) {
2284 img_request_discard_set(img_request);
2285 img_request->snapc = snapc;
2286 } else if (op_type == OBJ_OP_WRITE) {
2287 img_request_write_set(img_request);
2288 img_request->snapc = snapc;
2289 } else {
2290 img_request->snap_id = rbd_dev->spec->snap_id;
2291 }
2292 if (rbd_dev_parent_get(rbd_dev))
2293 img_request_layered_set(img_request);
2294 spin_lock_init(&img_request->completion_lock);
2295 img_request->next_completion = 0;
2296 img_request->callback = NULL;
2297 img_request->result = 0;
2298 img_request->obj_request_count = 0;
2299 INIT_LIST_HEAD(&img_request->obj_requests);
2300 kref_init(&img_request->kref);
2301
2302 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2303 obj_op_name(op_type), offset, length, img_request);
2304
2305 return img_request;
2306}
2307
2308static void rbd_img_request_destroy(struct kref *kref)
2309{
2310 struct rbd_img_request *img_request;
2311 struct rbd_obj_request *obj_request;
2312 struct rbd_obj_request *next_obj_request;
2313
2314 img_request = container_of(kref, struct rbd_img_request, kref);
2315
2316 dout("%s: img %p\n", __func__, img_request);
2317
2318 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2319 rbd_img_obj_request_del(img_request, obj_request);
2320 rbd_assert(img_request->obj_request_count == 0);
2321
2322 if (img_request_layered_test(img_request)) {
2323 img_request_layered_clear(img_request);
2324 rbd_dev_parent_put(img_request->rbd_dev);
2325 }
2326
2327 if (img_request_write_test(img_request) ||
2328 img_request_discard_test(img_request))
2329 ceph_put_snap_context(img_request->snapc);
2330
2331 kmem_cache_free(rbd_img_request_cache, img_request);
2332}
2333
2334static struct rbd_img_request *rbd_parent_request_create(
2335 struct rbd_obj_request *obj_request,
2336 u64 img_offset, u64 length)
2337{
2338 struct rbd_img_request *parent_request;
2339 struct rbd_device *rbd_dev;
2340
2341 rbd_assert(obj_request->img_request);
2342 rbd_dev = obj_request->img_request->rbd_dev;
2343
2344 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2345 length, OBJ_OP_READ, NULL);
2346 if (!parent_request)
2347 return NULL;
2348
2349 img_request_child_set(parent_request);
2350 rbd_obj_request_get(obj_request);
2351 parent_request->obj_request = obj_request;
2352
2353 return parent_request;
2354}
2355
2356static void rbd_parent_request_destroy(struct kref *kref)
2357{
2358 struct rbd_img_request *parent_request;
2359 struct rbd_obj_request *orig_request;
2360
2361 parent_request = container_of(kref, struct rbd_img_request, kref);
2362 orig_request = parent_request->obj_request;
2363
2364 parent_request->obj_request = NULL;
2365 rbd_obj_request_put(orig_request);
2366 img_request_child_clear(parent_request);
2367
2368 rbd_img_request_destroy(kref);
2369}
2370
2371static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2372{
2373 struct rbd_img_request *img_request;
2374 unsigned int xferred;
2375 int result;
2376 bool more;
2377
2378 rbd_assert(obj_request_img_data_test(obj_request));
2379 img_request = obj_request->img_request;
2380
2381 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2382 xferred = (unsigned int)obj_request->xferred;
2383 result = obj_request->result;
2384 if (result) {
2385 struct rbd_device *rbd_dev = img_request->rbd_dev;
2386 enum obj_operation_type op_type;
2387
2388 if (img_request_discard_test(img_request))
2389 op_type = OBJ_OP_DISCARD;
2390 else if (img_request_write_test(img_request))
2391 op_type = OBJ_OP_WRITE;
2392 else
2393 op_type = OBJ_OP_READ;
2394
2395 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2396 obj_op_name(op_type), obj_request->length,
2397 obj_request->img_offset, obj_request->offset);
2398 rbd_warn(rbd_dev, " result %d xferred %x",
2399 result, xferred);
2400 if (!img_request->result)
2401 img_request->result = result;
2402
2403
2404
2405
2406 xferred = obj_request->length;
2407 }
2408
2409
2410
2411 if (obj_request->type == OBJ_REQUEST_PAGES) {
2412 obj_request->pages = NULL;
2413 obj_request->page_count = 0;
2414 }
2415
2416 if (img_request_child_test(img_request)) {
2417 rbd_assert(img_request->obj_request != NULL);
2418 more = obj_request->which < img_request->obj_request_count - 1;
2419 } else {
2420 rbd_assert(img_request->rq != NULL);
2421
2422 more = blk_update_request(img_request->rq, result, xferred);
2423 if (!more)
2424 __blk_mq_end_request(img_request->rq, result);
2425 }
2426
2427 return more;
2428}
2429
2430static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2431{
2432 struct rbd_img_request *img_request;
2433 u32 which = obj_request->which;
2434 bool more = true;
2435
2436 rbd_assert(obj_request_img_data_test(obj_request));
2437 img_request = obj_request->img_request;
2438
2439 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2440 rbd_assert(img_request != NULL);
2441 rbd_assert(img_request->obj_request_count > 0);
2442 rbd_assert(which != BAD_WHICH);
2443 rbd_assert(which < img_request->obj_request_count);
2444
2445 spin_lock_irq(&img_request->completion_lock);
2446 if (which != img_request->next_completion)
2447 goto out;
2448
2449 for_each_obj_request_from(img_request, obj_request) {
2450 rbd_assert(more);
2451 rbd_assert(which < img_request->obj_request_count);
2452
2453 if (!obj_request_done_test(obj_request))
2454 break;
2455 more = rbd_img_obj_end_request(obj_request);
2456 which++;
2457 }
2458
2459 rbd_assert(more ^ (which == img_request->obj_request_count));
2460 img_request->next_completion = which;
2461out:
2462 spin_unlock_irq(&img_request->completion_lock);
2463 rbd_img_request_put(img_request);
2464
2465 if (!more)
2466 rbd_img_request_complete(img_request);
2467}
2468
2469
2470
2471
2472
2473
2474static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2475 struct ceph_osd_request *osd_request,
2476 enum obj_operation_type op_type,
2477 unsigned int num_ops)
2478{
2479 struct rbd_img_request *img_request = obj_request->img_request;
2480 struct rbd_device *rbd_dev = img_request->rbd_dev;
2481 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2482 u64 offset = obj_request->offset;
2483 u64 length = obj_request->length;
2484 u64 img_end;
2485 u16 opcode;
2486
2487 if (op_type == OBJ_OP_DISCARD) {
2488 if (!offset && length == object_size &&
2489 (!img_request_layered_test(img_request) ||
2490 !obj_request_overlaps_parent(obj_request))) {
2491 opcode = CEPH_OSD_OP_DELETE;
2492 } else if ((offset + length == object_size)) {
2493 opcode = CEPH_OSD_OP_TRUNCATE;
2494 } else {
2495 down_read(&rbd_dev->header_rwsem);
2496 img_end = rbd_dev->header.image_size;
2497 up_read(&rbd_dev->header_rwsem);
2498
2499 if (obj_request->img_offset + length == img_end)
2500 opcode = CEPH_OSD_OP_TRUNCATE;
2501 else
2502 opcode = CEPH_OSD_OP_ZERO;
2503 }
2504 } else if (op_type == OBJ_OP_WRITE) {
2505 if (!offset && length == object_size)
2506 opcode = CEPH_OSD_OP_WRITEFULL;
2507 else
2508 opcode = CEPH_OSD_OP_WRITE;
2509 osd_req_op_alloc_hint_init(osd_request, num_ops,
2510 object_size, object_size);
2511 num_ops++;
2512 } else {
2513 opcode = CEPH_OSD_OP_READ;
2514 }
2515
2516 if (opcode == CEPH_OSD_OP_DELETE)
2517 osd_req_op_init(osd_request, num_ops, opcode, 0);
2518 else
2519 osd_req_op_extent_init(osd_request, num_ops, opcode,
2520 offset, length, 0, 0);
2521
2522 if (obj_request->type == OBJ_REQUEST_BIO)
2523 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2524 obj_request->bio_list, length);
2525 else if (obj_request->type == OBJ_REQUEST_PAGES)
2526 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2527 obj_request->pages, length,
2528 offset & ~PAGE_MASK, false, false);
2529
2530
2531 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2532 rbd_osd_req_format_write(obj_request);
2533 else
2534 rbd_osd_req_format_read(obj_request);
2535}
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545static int rbd_img_request_fill(struct rbd_img_request *img_request,
2546 enum obj_request_type type,
2547 void *data_desc)
2548{
2549 struct rbd_device *rbd_dev = img_request->rbd_dev;
2550 struct rbd_obj_request *obj_request = NULL;
2551 struct rbd_obj_request *next_obj_request;
2552 struct bio *bio_list = NULL;
2553 unsigned int bio_offset = 0;
2554 struct page **pages = NULL;
2555 enum obj_operation_type op_type;
2556 u64 img_offset;
2557 u64 resid;
2558
2559 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2560 (int)type, data_desc);
2561
2562 img_offset = img_request->offset;
2563 resid = img_request->length;
2564 rbd_assert(resid > 0);
2565 op_type = rbd_img_request_op_type(img_request);
2566
2567 if (type == OBJ_REQUEST_BIO) {
2568 bio_list = data_desc;
2569 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2570 } else if (type == OBJ_REQUEST_PAGES) {
2571 pages = data_desc;
2572 }
2573
2574 while (resid) {
2575 struct ceph_osd_request *osd_req;
2576 const char *object_name;
2577 u64 offset;
2578 u64 length;
2579
2580 object_name = rbd_segment_name(rbd_dev, img_offset);
2581 if (!object_name)
2582 goto out_unwind;
2583 offset = rbd_segment_offset(rbd_dev, img_offset);
2584 length = rbd_segment_length(rbd_dev, img_offset, resid);
2585 obj_request = rbd_obj_request_create(object_name,
2586 offset, length, type);
2587
2588 rbd_segment_name_free(object_name);
2589 if (!obj_request)
2590 goto out_unwind;
2591
2592
2593
2594
2595
2596 rbd_img_obj_request_add(img_request, obj_request);
2597
2598 if (type == OBJ_REQUEST_BIO) {
2599 unsigned int clone_size;
2600
2601 rbd_assert(length <= (u64)UINT_MAX);
2602 clone_size = (unsigned int)length;
2603 obj_request->bio_list =
2604 bio_chain_clone_range(&bio_list,
2605 &bio_offset,
2606 clone_size,
2607 GFP_NOIO);
2608 if (!obj_request->bio_list)
2609 goto out_unwind;
2610 } else if (type == OBJ_REQUEST_PAGES) {
2611 unsigned int page_count;
2612
2613 obj_request->pages = pages;
2614 page_count = (u32)calc_pages_for(offset, length);
2615 obj_request->page_count = page_count;
2616 if ((offset + length) & ~PAGE_MASK)
2617 page_count--;
2618 pages += page_count;
2619 }
2620
2621 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2622 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2623 obj_request);
2624 if (!osd_req)
2625 goto out_unwind;
2626
2627 obj_request->osd_req = osd_req;
2628 obj_request->callback = rbd_img_obj_callback;
2629 obj_request->img_offset = img_offset;
2630
2631 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2632
2633 rbd_img_request_get(img_request);
2634
2635 img_offset += length;
2636 resid -= length;
2637 }
2638
2639 return 0;
2640
2641out_unwind:
2642 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2643 rbd_img_obj_request_del(img_request, obj_request);
2644
2645 return -ENOMEM;
2646}
2647
2648static void
2649rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2650{
2651 struct rbd_img_request *img_request;
2652 struct rbd_device *rbd_dev;
2653 struct page **pages;
2654 u32 page_count;
2655
2656 dout("%s: obj %p\n", __func__, obj_request);
2657
2658 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2659 obj_request->type == OBJ_REQUEST_NODATA);
2660 rbd_assert(obj_request_img_data_test(obj_request));
2661 img_request = obj_request->img_request;
2662 rbd_assert(img_request);
2663
2664 rbd_dev = img_request->rbd_dev;
2665 rbd_assert(rbd_dev);
2666
2667 pages = obj_request->copyup_pages;
2668 rbd_assert(pages != NULL);
2669 obj_request->copyup_pages = NULL;
2670 page_count = obj_request->copyup_page_count;
2671 rbd_assert(page_count);
2672 obj_request->copyup_page_count = 0;
2673 ceph_release_page_vector(pages, page_count);
2674
2675
2676
2677
2678
2679
2680
2681 if (!obj_request->result)
2682 obj_request->xferred = obj_request->length;
2683
2684 obj_request_done_set(obj_request);
2685}
2686
2687static void
2688rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2689{
2690 struct rbd_obj_request *orig_request;
2691 struct ceph_osd_request *osd_req;
2692 struct ceph_osd_client *osdc;
2693 struct rbd_device *rbd_dev;
2694 struct page **pages;
2695 enum obj_operation_type op_type;
2696 u32 page_count;
2697 int img_result;
2698 u64 parent_length;
2699
2700 rbd_assert(img_request_child_test(img_request));
2701
2702
2703
2704 pages = img_request->copyup_pages;
2705 rbd_assert(pages != NULL);
2706 img_request->copyup_pages = NULL;
2707 page_count = img_request->copyup_page_count;
2708 rbd_assert(page_count);
2709 img_request->copyup_page_count = 0;
2710
2711 orig_request = img_request->obj_request;
2712 rbd_assert(orig_request != NULL);
2713 rbd_assert(obj_request_type_valid(orig_request->type));
2714 img_result = img_request->result;
2715 parent_length = img_request->length;
2716 rbd_assert(parent_length == img_request->xferred);
2717 rbd_img_request_put(img_request);
2718
2719 rbd_assert(orig_request->img_request);
2720 rbd_dev = orig_request->img_request->rbd_dev;
2721 rbd_assert(rbd_dev);
2722
2723
2724
2725
2726
2727
2728 if (!rbd_dev->parent_overlap) {
2729 struct ceph_osd_client *osdc;
2730
2731 ceph_release_page_vector(pages, page_count);
2732 osdc = &rbd_dev->rbd_client->client->osdc;
2733 img_result = rbd_obj_request_submit(osdc, orig_request);
2734 if (!img_result)
2735 return;
2736 }
2737
2738 if (img_result)
2739 goto out_err;
2740
2741
2742
2743
2744
2745
2746
2747 img_result = -ENOMEM;
2748 osd_req = rbd_osd_req_create_copyup(orig_request);
2749 if (!osd_req)
2750 goto out_err;
2751 rbd_osd_req_destroy(orig_request->osd_req);
2752 orig_request->osd_req = osd_req;
2753 orig_request->copyup_pages = pages;
2754 orig_request->copyup_page_count = page_count;
2755
2756
2757
2758 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2759 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2760 false, false);
2761
2762
2763
2764 op_type = rbd_img_request_op_type(orig_request->img_request);
2765 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2766
2767
2768
2769 osdc = &rbd_dev->rbd_client->client->osdc;
2770 img_result = rbd_obj_request_submit(osdc, orig_request);
2771 if (!img_result)
2772 return;
2773out_err:
2774
2775
2776 orig_request->result = img_result;
2777 orig_request->xferred = 0;
2778 obj_request_done_set(orig_request);
2779 rbd_obj_request_complete(orig_request);
2780}
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2797{
2798 struct rbd_img_request *img_request = NULL;
2799 struct rbd_img_request *parent_request = NULL;
2800 struct rbd_device *rbd_dev;
2801 u64 img_offset;
2802 u64 length;
2803 struct page **pages = NULL;
2804 u32 page_count;
2805 int result;
2806
2807 rbd_assert(obj_request_img_data_test(obj_request));
2808 rbd_assert(obj_request_type_valid(obj_request->type));
2809
2810 img_request = obj_request->img_request;
2811 rbd_assert(img_request != NULL);
2812 rbd_dev = img_request->rbd_dev;
2813 rbd_assert(rbd_dev->parent != NULL);
2814
2815
2816
2817
2818
2819 img_offset = obj_request->img_offset - obj_request->offset;
2820 length = (u64)1 << rbd_dev->header.obj_order;
2821
2822
2823
2824
2825
2826
2827 if (img_offset + length > rbd_dev->parent_overlap) {
2828 rbd_assert(img_offset < rbd_dev->parent_overlap);
2829 length = rbd_dev->parent_overlap - img_offset;
2830 }
2831
2832
2833
2834
2835
2836 page_count = (u32)calc_pages_for(0, length);
2837 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2838 if (IS_ERR(pages)) {
2839 result = PTR_ERR(pages);
2840 pages = NULL;
2841 goto out_err;
2842 }
2843
2844 result = -ENOMEM;
2845 parent_request = rbd_parent_request_create(obj_request,
2846 img_offset, length);
2847 if (!parent_request)
2848 goto out_err;
2849
2850 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2851 if (result)
2852 goto out_err;
2853 parent_request->copyup_pages = pages;
2854 parent_request->copyup_page_count = page_count;
2855
2856 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2857 result = rbd_img_request_submit(parent_request);
2858 if (!result)
2859 return 0;
2860
2861 parent_request->copyup_pages = NULL;
2862 parent_request->copyup_page_count = 0;
2863 parent_request->obj_request = NULL;
2864 rbd_obj_request_put(obj_request);
2865out_err:
2866 if (pages)
2867 ceph_release_page_vector(pages, page_count);
2868 if (parent_request)
2869 rbd_img_request_put(parent_request);
2870 obj_request->result = result;
2871 obj_request->xferred = 0;
2872 obj_request_done_set(obj_request);
2873
2874 return result;
2875}
2876
2877static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2878{
2879 struct rbd_obj_request *orig_request;
2880 struct rbd_device *rbd_dev;
2881 int result;
2882
2883 rbd_assert(!obj_request_img_data_test(obj_request));
2884
2885
2886
2887
2888
2889
2890 orig_request = obj_request->obj_request;
2891 obj_request->obj_request = NULL;
2892 rbd_obj_request_put(orig_request);
2893 rbd_assert(orig_request);
2894 rbd_assert(orig_request->img_request);
2895
2896 result = obj_request->result;
2897 obj_request->result = 0;
2898
2899 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2900 obj_request, orig_request, result,
2901 obj_request->xferred, obj_request->length);
2902 rbd_obj_request_put(obj_request);
2903
2904
2905
2906
2907
2908
2909 rbd_dev = orig_request->img_request->rbd_dev;
2910 if (!rbd_dev->parent_overlap) {
2911 struct ceph_osd_client *osdc;
2912
2913 osdc = &rbd_dev->rbd_client->client->osdc;
2914 result = rbd_obj_request_submit(osdc, orig_request);
2915 if (!result)
2916 return;
2917 }
2918
2919
2920
2921
2922
2923
2924
2925 if (!result) {
2926 obj_request_existence_set(orig_request, true);
2927 } else if (result == -ENOENT) {
2928 obj_request_existence_set(orig_request, false);
2929 } else if (result) {
2930 orig_request->result = result;
2931 goto out;
2932 }
2933
2934
2935
2936
2937
2938 orig_request->result = rbd_img_obj_request_submit(orig_request);
2939out:
2940 if (orig_request->result)
2941 rbd_obj_request_complete(orig_request);
2942}
2943
2944static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2945{
2946 struct rbd_obj_request *stat_request;
2947 struct rbd_device *rbd_dev;
2948 struct ceph_osd_client *osdc;
2949 struct page **pages = NULL;
2950 u32 page_count;
2951 size_t size;
2952 int ret;
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2963 page_count = (u32)calc_pages_for(0, size);
2964 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2965 if (IS_ERR(pages))
2966 return PTR_ERR(pages);
2967
2968 ret = -ENOMEM;
2969 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2970 OBJ_REQUEST_PAGES);
2971 if (!stat_request)
2972 goto out;
2973
2974 rbd_obj_request_get(obj_request);
2975 stat_request->obj_request = obj_request;
2976 stat_request->pages = pages;
2977 stat_request->page_count = page_count;
2978
2979 rbd_assert(obj_request->img_request);
2980 rbd_dev = obj_request->img_request->rbd_dev;
2981 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2982 stat_request);
2983 if (!stat_request->osd_req)
2984 goto out;
2985 stat_request->callback = rbd_img_obj_exists_callback;
2986
2987 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2988 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2989 false, false);
2990 rbd_osd_req_format_read(stat_request);
2991
2992 osdc = &rbd_dev->rbd_client->client->osdc;
2993 ret = rbd_obj_request_submit(osdc, stat_request);
2994out:
2995 if (ret)
2996 rbd_obj_request_put(obj_request);
2997
2998 return ret;
2999}
3000
3001static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
3002{
3003 struct rbd_img_request *img_request;
3004 struct rbd_device *rbd_dev;
3005
3006 rbd_assert(obj_request_img_data_test(obj_request));
3007
3008 img_request = obj_request->img_request;
3009 rbd_assert(img_request);
3010 rbd_dev = img_request->rbd_dev;
3011
3012
3013 if (!img_request_write_test(img_request) &&
3014 !img_request_discard_test(img_request))
3015 return true;
3016
3017
3018 if (!img_request_layered_test(img_request))
3019 return true;
3020
3021
3022
3023
3024
3025 if (!obj_request_overlaps_parent(obj_request))
3026 return true;
3027
3028
3029
3030
3031
3032 if (!obj_request->offset &&
3033 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
3034 return true;
3035
3036
3037
3038
3039
3040 if (obj_request_known_test(obj_request) &&
3041 obj_request_exists_test(obj_request))
3042 return true;
3043
3044 return false;
3045}
3046
3047static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
3048{
3049 if (img_obj_request_simple(obj_request)) {
3050 struct rbd_device *rbd_dev;
3051 struct ceph_osd_client *osdc;
3052
3053 rbd_dev = obj_request->img_request->rbd_dev;
3054 osdc = &rbd_dev->rbd_client->client->osdc;
3055
3056 return rbd_obj_request_submit(osdc, obj_request);
3057 }
3058
3059
3060
3061
3062
3063
3064
3065 if (obj_request_known_test(obj_request))
3066 return rbd_img_obj_parent_read_full(obj_request);
3067
3068
3069
3070 return rbd_img_obj_exists_submit(obj_request);
3071}
3072
3073static int rbd_img_request_submit(struct rbd_img_request *img_request)
3074{
3075 struct rbd_obj_request *obj_request;
3076 struct rbd_obj_request *next_obj_request;
3077 int ret = 0;
3078
3079 dout("%s: img %p\n", __func__, img_request);
3080
3081 rbd_img_request_get(img_request);
3082 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
3083 ret = rbd_img_obj_request_submit(obj_request);
3084 if (ret)
3085 goto out_put_ireq;
3086 }
3087
3088out_put_ireq:
3089 rbd_img_request_put(img_request);
3090 return ret;
3091}
3092
3093static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3094{
3095 struct rbd_obj_request *obj_request;
3096 struct rbd_device *rbd_dev;
3097 u64 obj_end;
3098 u64 img_xferred;
3099 int img_result;
3100
3101 rbd_assert(img_request_child_test(img_request));
3102
3103
3104
3105 obj_request = img_request->obj_request;
3106 img_xferred = img_request->xferred;
3107 img_result = img_request->result;
3108 rbd_img_request_put(img_request);
3109
3110
3111
3112
3113
3114
3115 rbd_assert(obj_request);
3116 rbd_assert(obj_request->img_request);
3117 rbd_dev = obj_request->img_request->rbd_dev;
3118 if (!rbd_dev->parent_overlap) {
3119 struct ceph_osd_client *osdc;
3120
3121 osdc = &rbd_dev->rbd_client->client->osdc;
3122 img_result = rbd_obj_request_submit(osdc, obj_request);
3123 if (!img_result)
3124 return;
3125 }
3126
3127 obj_request->result = img_result;
3128 if (obj_request->result)
3129 goto out;
3130
3131
3132
3133
3134
3135
3136
3137
3138 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3139 obj_end = obj_request->img_offset + obj_request->length;
3140 if (obj_end > rbd_dev->parent_overlap) {
3141 u64 xferred = 0;
3142
3143 if (obj_request->img_offset < rbd_dev->parent_overlap)
3144 xferred = rbd_dev->parent_overlap -
3145 obj_request->img_offset;
3146
3147 obj_request->xferred = min(img_xferred, xferred);
3148 } else {
3149 obj_request->xferred = img_xferred;
3150 }
3151out:
3152 rbd_img_obj_request_read_callback(obj_request);
3153 rbd_obj_request_complete(obj_request);
3154}
3155
3156static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3157{
3158 struct rbd_img_request *img_request;
3159 int result;
3160
3161 rbd_assert(obj_request_img_data_test(obj_request));
3162 rbd_assert(obj_request->img_request != NULL);
3163 rbd_assert(obj_request->result == (s32) -ENOENT);
3164 rbd_assert(obj_request_type_valid(obj_request->type));
3165
3166
3167 img_request = rbd_parent_request_create(obj_request,
3168 obj_request->img_offset,
3169 obj_request->length);
3170 result = -ENOMEM;
3171 if (!img_request)
3172 goto out_err;
3173
3174 if (obj_request->type == OBJ_REQUEST_BIO)
3175 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3176 obj_request->bio_list);
3177 else
3178 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3179 obj_request->pages);
3180 if (result)
3181 goto out_err;
3182
3183 img_request->callback = rbd_img_parent_read_callback;
3184 result = rbd_img_request_submit(img_request);
3185 if (result)
3186 goto out_err;
3187
3188 return;
3189out_err:
3190 if (img_request)
3191 rbd_img_request_put(img_request);
3192 obj_request->result = result;
3193 obj_request->xferred = 0;
3194 obj_request_done_set(obj_request);
3195}
3196
3197static const struct rbd_client_id rbd_empty_cid;
3198
3199static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3200 const struct rbd_client_id *rhs)
3201{
3202 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3203}
3204
3205static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3206{
3207 struct rbd_client_id cid;
3208
3209 mutex_lock(&rbd_dev->watch_mutex);
3210 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3211 cid.handle = rbd_dev->watch_cookie;
3212 mutex_unlock(&rbd_dev->watch_mutex);
3213 return cid;
3214}
3215
3216
3217
3218
3219static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3220 const struct rbd_client_id *cid)
3221{
3222 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3223 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3224 cid->gid, cid->handle);
3225 rbd_dev->owner_cid = *cid;
3226}
3227
3228static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3229{
3230 mutex_lock(&rbd_dev->watch_mutex);
3231 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3232 mutex_unlock(&rbd_dev->watch_mutex);
3233}
3234
3235
3236
3237
3238static int rbd_lock(struct rbd_device *rbd_dev)
3239{
3240 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3241 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3242 char cookie[32];
3243 int ret;
3244
3245 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3246
3247 format_lock_cookie(rbd_dev, cookie);
3248 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3249 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3250 RBD_LOCK_TAG, "", 0);
3251 if (ret)
3252 return ret;
3253
3254 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3255 rbd_set_owner_cid(rbd_dev, &cid);
3256 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3257 return 0;
3258}
3259
3260
3261
3262
3263static int rbd_unlock(struct rbd_device *rbd_dev)
3264{
3265 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3266 char cookie[32];
3267 int ret;
3268
3269 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3270
3271 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3272
3273 format_lock_cookie(rbd_dev, cookie);
3274 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3275 RBD_LOCK_NAME, cookie);
3276 if (ret && ret != -ENOENT) {
3277 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3278 return ret;
3279 }
3280
3281 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3282 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3283 return 0;
3284}
3285
3286static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3287 enum rbd_notify_op notify_op,
3288 struct page ***preply_pages,
3289 size_t *preply_len)
3290{
3291 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3292 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3293 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3294 char buf[buf_size];
3295 void *p = buf;
3296
3297 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3298
3299
3300 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3301 ceph_encode_32(&p, notify_op);
3302 ceph_encode_64(&p, cid.gid);
3303 ceph_encode_64(&p, cid.handle);
3304
3305 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3306 &rbd_dev->header_oloc, buf, buf_size,
3307 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3308}
3309
3310static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3311 enum rbd_notify_op notify_op)
3312{
3313 struct page **reply_pages;
3314 size_t reply_len;
3315
3316 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3317 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3318}
3319
3320static void rbd_notify_acquired_lock(struct work_struct *work)
3321{
3322 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3323 acquired_lock_work);
3324
3325 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3326}
3327
3328static void rbd_notify_released_lock(struct work_struct *work)
3329{
3330 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3331 released_lock_work);
3332
3333 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3334}
3335
3336static int rbd_request_lock(struct rbd_device *rbd_dev)
3337{
3338 struct page **reply_pages;
3339 size_t reply_len;
3340 bool lock_owner_responded = false;
3341 int ret;
3342
3343 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3344
3345 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3346 &reply_pages, &reply_len);
3347 if (ret && ret != -ETIMEDOUT) {
3348 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3349 goto out;
3350 }
3351
3352 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3353 void *p = page_address(reply_pages[0]);
3354 void *const end = p + reply_len;
3355 u32 n;
3356
3357 ceph_decode_32_safe(&p, end, n, e_inval);
3358 while (n--) {
3359 u8 struct_v;
3360 u32 len;
3361
3362 ceph_decode_need(&p, end, 8 + 8, e_inval);
3363 p += 8 + 8;
3364
3365 ceph_decode_32_safe(&p, end, len, e_inval);
3366 if (!len)
3367 continue;
3368
3369 if (lock_owner_responded) {
3370 rbd_warn(rbd_dev,
3371 "duplicate lock owners detected");
3372 ret = -EIO;
3373 goto out;
3374 }
3375
3376 lock_owner_responded = true;
3377 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3378 &struct_v, &len);
3379 if (ret) {
3380 rbd_warn(rbd_dev,
3381 "failed to decode ResponseMessage: %d",
3382 ret);
3383 goto e_inval;
3384 }
3385
3386 ret = ceph_decode_32(&p);
3387 }
3388 }
3389
3390 if (!lock_owner_responded) {
3391 rbd_warn(rbd_dev, "no lock owners detected");
3392 ret = -ETIMEDOUT;
3393 }
3394
3395out:
3396 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3397 return ret;
3398
3399e_inval:
3400 ret = -EINVAL;
3401 goto out;
3402}
3403
3404static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3405{
3406 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3407
3408 cancel_delayed_work(&rbd_dev->lock_dwork);
3409 if (wake_all)
3410 wake_up_all(&rbd_dev->lock_waitq);
3411 else
3412 wake_up(&rbd_dev->lock_waitq);
3413}
3414
3415static int get_lock_owner_info(struct rbd_device *rbd_dev,
3416 struct ceph_locker **lockers, u32 *num_lockers)
3417{
3418 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3419 u8 lock_type;
3420 char *lock_tag;
3421 int ret;
3422
3423 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3424
3425 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3426 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3427 &lock_type, &lock_tag, lockers, num_lockers);
3428 if (ret)
3429 return ret;
3430
3431 if (*num_lockers == 0) {
3432 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3433 goto out;
3434 }
3435
3436 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3437 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3438 lock_tag);
3439 ret = -EBUSY;
3440 goto out;
3441 }
3442
3443 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3444 rbd_warn(rbd_dev, "shared lock type detected");
3445 ret = -EBUSY;
3446 goto out;
3447 }
3448
3449 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3450 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3451 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3452 (*lockers)[0].id.cookie);
3453 ret = -EBUSY;
3454 goto out;
3455 }
3456
3457out:
3458 kfree(lock_tag);
3459 return ret;
3460}
3461
3462static int find_watcher(struct rbd_device *rbd_dev,
3463 const struct ceph_locker *locker)
3464{
3465 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3466 struct ceph_watch_item *watchers;
3467 u32 num_watchers;
3468 u64 cookie;
3469 int i;
3470 int ret;
3471
3472 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3473 &rbd_dev->header_oloc, &watchers,
3474 &num_watchers);
3475 if (ret)
3476 return ret;
3477
3478 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3479 for (i = 0; i < num_watchers; i++) {
3480 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3481 sizeof(locker->info.addr)) &&
3482 watchers[i].cookie == cookie) {
3483 struct rbd_client_id cid = {
3484 .gid = le64_to_cpu(watchers[i].name.num),
3485 .handle = cookie,
3486 };
3487
3488 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3489 rbd_dev, cid.gid, cid.handle);
3490 rbd_set_owner_cid(rbd_dev, &cid);
3491 ret = 1;
3492 goto out;
3493 }
3494 }
3495
3496 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3497 ret = 0;
3498out:
3499 kfree(watchers);
3500 return ret;
3501}
3502
3503
3504
3505
3506static int rbd_try_lock(struct rbd_device *rbd_dev)
3507{
3508 struct ceph_client *client = rbd_dev->rbd_client->client;
3509 struct ceph_locker *lockers;
3510 u32 num_lockers;
3511 int ret;
3512
3513 for (;;) {
3514 ret = rbd_lock(rbd_dev);
3515 if (ret != -EBUSY)
3516 return ret;
3517
3518
3519 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3520 if (ret)
3521 return ret;
3522
3523 if (num_lockers == 0)
3524 goto again;
3525
3526 ret = find_watcher(rbd_dev, lockers);
3527 if (ret) {
3528 if (ret > 0)
3529 ret = 0;
3530 goto out;
3531 }
3532
3533 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3534 ENTITY_NAME(lockers[0].id.name));
3535
3536 ret = ceph_monc_blacklist_add(&client->monc,
3537 &lockers[0].info.addr);
3538 if (ret) {
3539 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3540 ENTITY_NAME(lockers[0].id.name), ret);
3541 goto out;
3542 }
3543
3544 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3545 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3546 lockers[0].id.cookie,
3547 &lockers[0].id.name);
3548 if (ret && ret != -ENOENT)
3549 goto out;
3550
3551again:
3552 ceph_free_lockers(lockers, num_lockers);
3553 }
3554
3555out:
3556 ceph_free_lockers(lockers, num_lockers);
3557 return ret;
3558}
3559
3560
3561
3562
3563static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3564 int *pret)
3565{
3566 enum rbd_lock_state lock_state;
3567
3568 down_read(&rbd_dev->lock_rwsem);
3569 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3570 rbd_dev->lock_state);
3571 if (__rbd_is_lock_owner(rbd_dev)) {
3572 lock_state = rbd_dev->lock_state;
3573 up_read(&rbd_dev->lock_rwsem);
3574 return lock_state;
3575 }
3576
3577 up_read(&rbd_dev->lock_rwsem);
3578 down_write(&rbd_dev->lock_rwsem);
3579 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3580 rbd_dev->lock_state);
3581 if (!__rbd_is_lock_owner(rbd_dev)) {
3582 *pret = rbd_try_lock(rbd_dev);
3583 if (*pret)
3584 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3585 }
3586
3587 lock_state = rbd_dev->lock_state;
3588 up_write(&rbd_dev->lock_rwsem);
3589 return lock_state;
3590}
3591
3592static void rbd_acquire_lock(struct work_struct *work)
3593{
3594 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3595 struct rbd_device, lock_dwork);
3596 enum rbd_lock_state lock_state;
3597 int ret;
3598
3599 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3600again:
3601 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3602 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3603 if (lock_state == RBD_LOCK_STATE_LOCKED)
3604 wake_requests(rbd_dev, true);
3605 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3606 rbd_dev, lock_state, ret);
3607 return;
3608 }
3609
3610 ret = rbd_request_lock(rbd_dev);
3611 if (ret == -ETIMEDOUT) {
3612 goto again;
3613 } else if (ret < 0) {
3614 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3615 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3616 RBD_RETRY_DELAY);
3617 } else {
3618
3619
3620
3621
3622 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3623 rbd_dev);
3624 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3625 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3626 }
3627}
3628
3629
3630
3631
3632static bool rbd_release_lock(struct rbd_device *rbd_dev)
3633{
3634 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3635 rbd_dev->lock_state);
3636 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3637 return false;
3638
3639 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3640 downgrade_write(&rbd_dev->lock_rwsem);
3641
3642
3643
3644
3645
3646
3647 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3648 up_read(&rbd_dev->lock_rwsem);
3649
3650 down_write(&rbd_dev->lock_rwsem);
3651 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3652 rbd_dev->lock_state);
3653 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3654 return false;
3655
3656 if (!rbd_unlock(rbd_dev))
3657
3658
3659
3660
3661
3662
3663
3664 cancel_delayed_work(&rbd_dev->lock_dwork);
3665
3666 return true;
3667}
3668
3669static void rbd_release_lock_work(struct work_struct *work)
3670{
3671 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3672 unlock_work);
3673
3674 down_write(&rbd_dev->lock_rwsem);
3675 rbd_release_lock(rbd_dev);
3676 up_write(&rbd_dev->lock_rwsem);
3677}
3678
3679static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3680 void **p)
3681{
3682 struct rbd_client_id cid = { 0 };
3683
3684 if (struct_v >= 2) {
3685 cid.gid = ceph_decode_64(p);
3686 cid.handle = ceph_decode_64(p);
3687 }
3688
3689 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3690 cid.handle);
3691 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3692 down_write(&rbd_dev->lock_rwsem);
3693 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3694
3695
3696
3697
3698 up_write(&rbd_dev->lock_rwsem);
3699 return;
3700 }
3701
3702 rbd_set_owner_cid(rbd_dev, &cid);
3703 downgrade_write(&rbd_dev->lock_rwsem);
3704 } else {
3705 down_read(&rbd_dev->lock_rwsem);
3706 }
3707
3708 if (!__rbd_is_lock_owner(rbd_dev))
3709 wake_requests(rbd_dev, false);
3710 up_read(&rbd_dev->lock_rwsem);
3711}
3712
3713static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3714 void **p)
3715{
3716 struct rbd_client_id cid = { 0 };
3717
3718 if (struct_v >= 2) {
3719 cid.gid = ceph_decode_64(p);
3720 cid.handle = ceph_decode_64(p);
3721 }
3722
3723 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3724 cid.handle);
3725 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3726 down_write(&rbd_dev->lock_rwsem);
3727 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3728 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3729 __func__, rbd_dev, cid.gid, cid.handle,
3730 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3731 up_write(&rbd_dev->lock_rwsem);
3732 return;
3733 }
3734
3735 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3736 downgrade_write(&rbd_dev->lock_rwsem);
3737 } else {
3738 down_read(&rbd_dev->lock_rwsem);
3739 }
3740
3741 if (!__rbd_is_lock_owner(rbd_dev))
3742 wake_requests(rbd_dev, false);
3743 up_read(&rbd_dev->lock_rwsem);
3744}
3745
3746static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3747 void **p)
3748{
3749 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3750 struct rbd_client_id cid = { 0 };
3751 bool need_to_send;
3752
3753 if (struct_v >= 2) {
3754 cid.gid = ceph_decode_64(p);
3755 cid.handle = ceph_decode_64(p);
3756 }
3757
3758 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3759 cid.handle);
3760 if (rbd_cid_equal(&cid, &my_cid))
3761 return false;
3762
3763 down_read(&rbd_dev->lock_rwsem);
3764 need_to_send = __rbd_is_lock_owner(rbd_dev);
3765 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3766 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3767 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3768 rbd_dev);
3769 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3770 }
3771 }
3772 up_read(&rbd_dev->lock_rwsem);
3773 return need_to_send;
3774}
3775
3776static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3777 u64 notify_id, u64 cookie, s32 *result)
3778{
3779 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3781 char buf[buf_size];
3782 int ret;
3783
3784 if (result) {
3785 void *p = buf;
3786
3787
3788 ceph_start_encoding(&p, 1, 1,
3789 buf_size - CEPH_ENCODING_START_BLK_LEN);
3790 ceph_encode_32(&p, *result);
3791 } else {
3792 buf_size = 0;
3793 }
3794
3795 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3796 &rbd_dev->header_oloc, notify_id, cookie,
3797 buf, buf_size);
3798 if (ret)
3799 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3800}
3801
3802static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3803 u64 cookie)
3804{
3805 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3807}
3808
3809static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3810 u64 notify_id, u64 cookie, s32 result)
3811{
3812 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3813 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3814}
3815
3816static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3817 u64 notifier_id, void *data, size_t data_len)
3818{
3819 struct rbd_device *rbd_dev = arg;
3820 void *p = data;
3821 void *const end = p + data_len;
3822 u8 struct_v = 0;
3823 u32 len;
3824 u32 notify_op;
3825 int ret;
3826
3827 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3828 __func__, rbd_dev, cookie, notify_id, data_len);
3829 if (data_len) {
3830 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3831 &struct_v, &len);
3832 if (ret) {
3833 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3834 ret);
3835 return;
3836 }
3837
3838 notify_op = ceph_decode_32(&p);
3839 } else {
3840
3841 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3842 len = 0;
3843 }
3844
3845 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3846 switch (notify_op) {
3847 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3848 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3849 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3850 break;
3851 case RBD_NOTIFY_OP_RELEASED_LOCK:
3852 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3853 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3854 break;
3855 case RBD_NOTIFY_OP_REQUEST_LOCK:
3856 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3857
3858
3859
3860
3861 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3862 cookie, 0);
3863 else
3864 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3865 break;
3866 case RBD_NOTIFY_OP_HEADER_UPDATE:
3867 ret = rbd_dev_refresh(rbd_dev);
3868 if (ret)
3869 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3870
3871 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3872 break;
3873 default:
3874 if (rbd_is_lock_owner(rbd_dev))
3875 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3876 cookie, -EOPNOTSUPP);
3877 else
3878 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3879 break;
3880 }
3881}
3882
3883static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3884
3885static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3886{
3887 struct rbd_device *rbd_dev = arg;
3888
3889 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3890
3891 down_write(&rbd_dev->lock_rwsem);
3892 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3893 up_write(&rbd_dev->lock_rwsem);
3894
3895 mutex_lock(&rbd_dev->watch_mutex);
3896 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3897 __rbd_unregister_watch(rbd_dev);
3898 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3899
3900 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3901 }
3902 mutex_unlock(&rbd_dev->watch_mutex);
3903}
3904
3905
3906
3907
3908static int __rbd_register_watch(struct rbd_device *rbd_dev)
3909{
3910 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3911 struct ceph_osd_linger_request *handle;
3912
3913 rbd_assert(!rbd_dev->watch_handle);
3914 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3915
3916 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3917 &rbd_dev->header_oloc, rbd_watch_cb,
3918 rbd_watch_errcb, rbd_dev);
3919 if (IS_ERR(handle))
3920 return PTR_ERR(handle);
3921
3922 rbd_dev->watch_handle = handle;
3923 return 0;
3924}
3925
3926
3927
3928
3929static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3930{
3931 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3932 int ret;
3933
3934 rbd_assert(rbd_dev->watch_handle);
3935 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3936
3937 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3938 if (ret)
3939 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3940
3941 rbd_dev->watch_handle = NULL;
3942}
3943
3944static int rbd_register_watch(struct rbd_device *rbd_dev)
3945{
3946 int ret;
3947
3948 mutex_lock(&rbd_dev->watch_mutex);
3949 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3950 ret = __rbd_register_watch(rbd_dev);
3951 if (ret)
3952 goto out;
3953
3954 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3955 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3956
3957out:
3958 mutex_unlock(&rbd_dev->watch_mutex);
3959 return ret;
3960}
3961
3962static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3963{
3964 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3965
3966 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3967 cancel_work_sync(&rbd_dev->acquired_lock_work);
3968 cancel_work_sync(&rbd_dev->released_lock_work);
3969 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3970 cancel_work_sync(&rbd_dev->unlock_work);
3971}
3972
3973static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3974{
3975 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3976 cancel_tasks_sync(rbd_dev);
3977
3978 mutex_lock(&rbd_dev->watch_mutex);
3979 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3980 __rbd_unregister_watch(rbd_dev);
3981 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3982 mutex_unlock(&rbd_dev->watch_mutex);
3983
3984 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3985}
3986
3987static void rbd_reregister_watch(struct work_struct *work)
3988{
3989 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3990 struct rbd_device, watch_dwork);
3991 bool was_lock_owner = false;
3992 bool need_to_wake = false;
3993 int ret;
3994
3995 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3996
3997 down_write(&rbd_dev->lock_rwsem);
3998 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3999 was_lock_owner = rbd_release_lock(rbd_dev);
4000
4001 mutex_lock(&rbd_dev->watch_mutex);
4002 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4003 mutex_unlock(&rbd_dev->watch_mutex);
4004 goto out;
4005 }
4006
4007 ret = __rbd_register_watch(rbd_dev);
4008 if (ret) {
4009 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4010 if (ret == -EBLACKLISTED || ret == -ENOENT) {
4011 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
4012 need_to_wake = true;
4013 } else {
4014 queue_delayed_work(rbd_dev->task_wq,
4015 &rbd_dev->watch_dwork,
4016 RBD_RETRY_DELAY);
4017 }
4018 mutex_unlock(&rbd_dev->watch_mutex);
4019 goto out;
4020 }
4021
4022 need_to_wake = true;
4023 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4024 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4025 mutex_unlock(&rbd_dev->watch_mutex);
4026
4027 ret = rbd_dev_refresh(rbd_dev);
4028 if (ret)
4029 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
4030
4031 if (was_lock_owner) {
4032 ret = rbd_try_lock(rbd_dev);
4033 if (ret)
4034 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
4035 ret);
4036 }
4037
4038out:
4039 up_write(&rbd_dev->lock_rwsem);
4040 if (need_to_wake)
4041 wake_requests(rbd_dev, true);
4042}
4043
4044
4045
4046
4047
4048static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4049 const char *object_name,
4050 const char *class_name,
4051 const char *method_name,
4052 const void *outbound,
4053 size_t outbound_size,
4054 void *inbound,
4055 size_t inbound_size)
4056{
4057 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4058 struct rbd_obj_request *obj_request;
4059 struct page **pages;
4060 u32 page_count;
4061 int ret;
4062
4063
4064
4065
4066
4067
4068
4069
4070 page_count = (u32)calc_pages_for(0, inbound_size);
4071 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4072 if (IS_ERR(pages))
4073 return PTR_ERR(pages);
4074
4075 ret = -ENOMEM;
4076 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
4077 OBJ_REQUEST_PAGES);
4078 if (!obj_request)
4079 goto out;
4080
4081 obj_request->pages = pages;
4082 obj_request->page_count = page_count;
4083
4084 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4085 obj_request);
4086 if (!obj_request->osd_req)
4087 goto out;
4088
4089 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
4090 class_name, method_name);
4091 if (outbound_size) {
4092 struct ceph_pagelist *pagelist;
4093
4094 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4095 if (!pagelist)
4096 goto out;
4097
4098 ceph_pagelist_init(pagelist);
4099 ceph_pagelist_append(pagelist, outbound, outbound_size);
4100 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4101 pagelist);
4102 }
4103 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4104 obj_request->pages, inbound_size,
4105 0, false, false);
4106 rbd_osd_req_format_read(obj_request);
4107
4108 ret = rbd_obj_request_submit(osdc, obj_request);
4109 if (ret)
4110 goto out;
4111 ret = rbd_obj_request_wait(obj_request);
4112 if (ret)
4113 goto out;
4114
4115 ret = obj_request->result;
4116 if (ret < 0)
4117 goto out;
4118
4119 rbd_assert(obj_request->xferred < (u64)INT_MAX);
4120 ret = (int)obj_request->xferred;
4121 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
4122out:
4123 if (obj_request)
4124 rbd_obj_request_put(obj_request);
4125 else
4126 ceph_release_page_vector(pages, page_count);
4127
4128 return ret;
4129}
4130
4131
4132
4133
4134static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4135{
4136 DEFINE_WAIT(wait);
4137
4138 do {
4139
4140
4141
4142
4143 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4144 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4145 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4146 TASK_UNINTERRUPTIBLE);
4147 up_read(&rbd_dev->lock_rwsem);
4148 schedule();
4149 down_read(&rbd_dev->lock_rwsem);
4150 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4151 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4152
4153 finish_wait(&rbd_dev->lock_waitq, &wait);
4154}
4155
4156static void rbd_queue_workfn(struct work_struct *work)
4157{
4158 struct request *rq = blk_mq_rq_from_pdu(work);
4159 struct rbd_device *rbd_dev = rq->q->queuedata;
4160 struct rbd_img_request *img_request;
4161 struct ceph_snap_context *snapc = NULL;
4162 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4163 u64 length = blk_rq_bytes(rq);
4164 enum obj_operation_type op_type;
4165 u64 mapping_size;
4166 bool must_be_locked;
4167 int result;
4168
4169 if (rq->cmd_type != REQ_TYPE_FS) {
4170 dout("%s: non-fs request type %d\n", __func__,
4171 (int) rq->cmd_type);
4172 result = -EIO;
4173 goto err;
4174 }
4175
4176 if (rq->cmd_flags & REQ_DISCARD)
4177 op_type = OBJ_OP_DISCARD;
4178 else if (rq->cmd_flags & REQ_WRITE)
4179 op_type = OBJ_OP_WRITE;
4180 else
4181 op_type = OBJ_OP_READ;
4182
4183
4184
4185 if (!length) {
4186 dout("%s: zero-length request\n", __func__);
4187 result = 0;
4188 goto err_rq;
4189 }
4190
4191
4192
4193 if (op_type != OBJ_OP_READ) {
4194 if (rbd_dev->mapping.read_only) {
4195 result = -EROFS;
4196 goto err_rq;
4197 }
4198 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4199 }
4200
4201
4202
4203
4204
4205
4206
4207 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4208 dout("request for non-existent snapshot");
4209 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4210 result = -ENXIO;
4211 goto err_rq;
4212 }
4213
4214 if (offset && length > U64_MAX - offset + 1) {
4215 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4216 length);
4217 result = -EINVAL;
4218 goto err_rq;
4219 }
4220
4221 blk_mq_start_request(rq);
4222
4223 down_read(&rbd_dev->header_rwsem);
4224 mapping_size = rbd_dev->mapping.size;
4225 if (op_type != OBJ_OP_READ) {
4226 snapc = rbd_dev->header.snapc;
4227 ceph_get_snap_context(snapc);
4228 must_be_locked = rbd_is_lock_supported(rbd_dev);
4229 } else {
4230 must_be_locked = rbd_dev->opts->lock_on_read &&
4231 rbd_is_lock_supported(rbd_dev);
4232 }
4233 up_read(&rbd_dev->header_rwsem);
4234
4235 if (offset + length > mapping_size) {
4236 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4237 length, mapping_size);
4238 result = -EIO;
4239 goto err_rq;
4240 }
4241
4242 if (must_be_locked) {
4243 down_read(&rbd_dev->lock_rwsem);
4244 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4245 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4246 rbd_wait_state_locked(rbd_dev);
4247
4248 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4249 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4250 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4251 result = -EBLACKLISTED;
4252 goto err_unlock;
4253 }
4254 }
4255
4256 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4257 snapc);
4258 if (!img_request) {
4259 result = -ENOMEM;
4260 goto err_unlock;
4261 }
4262 img_request->rq = rq;
4263 snapc = NULL;
4264
4265 if (op_type == OBJ_OP_DISCARD)
4266 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4267 NULL);
4268 else
4269 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4270 rq->bio);
4271 if (result)
4272 goto err_img_request;
4273
4274 result = rbd_img_request_submit(img_request);
4275 if (result)
4276 goto err_img_request;
4277
4278 if (must_be_locked)
4279 up_read(&rbd_dev->lock_rwsem);
4280 return;
4281
4282err_img_request:
4283 rbd_img_request_put(img_request);
4284err_unlock:
4285 if (must_be_locked)
4286 up_read(&rbd_dev->lock_rwsem);
4287err_rq:
4288 if (result)
4289 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4290 obj_op_name(op_type), length, offset, result);
4291 ceph_put_snap_context(snapc);
4292err:
4293 blk_mq_end_request(rq, result);
4294}
4295
4296static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4297 const struct blk_mq_queue_data *bd)
4298{
4299 struct request *rq = bd->rq;
4300 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4301
4302 queue_work(rbd_wq, work);
4303 return BLK_MQ_RQ_QUEUE_OK;
4304}
4305
4306
4307
4308
4309
4310
4311static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
4312 struct bio_vec *bvec)
4313{
4314 struct rbd_device *rbd_dev = q->queuedata;
4315 sector_t sector_offset;
4316 sector_t sectors_per_obj;
4317 sector_t obj_sector_offset;
4318 int ret;
4319
4320
4321
4322
4323
4324
4325 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
4326 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
4327 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
4328
4329
4330
4331
4332
4333 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
4334 if (ret > bmd->bi_size)
4335 ret -= bmd->bi_size;
4336 else
4337 ret = 0;
4338
4339
4340
4341
4342
4343
4344
4345 rbd_assert(bvec->bv_len <= PAGE_SIZE);
4346 if (ret > (int) bvec->bv_len || !bmd->bi_size)
4347 ret = (int) bvec->bv_len;
4348
4349 return ret;
4350}
4351
4352static void rbd_free_disk(struct rbd_device *rbd_dev)
4353{
4354 struct gendisk *disk = rbd_dev->disk;
4355
4356 if (!disk)
4357 return;
4358
4359 rbd_dev->disk = NULL;
4360 if (disk->flags & GENHD_FL_UP) {
4361 del_gendisk(disk);
4362 if (disk->queue)
4363 blk_cleanup_queue(disk->queue);
4364 blk_mq_free_tag_set(&rbd_dev->tag_set);
4365 }
4366 put_disk(disk);
4367}
4368
4369static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4370 const char *object_name,
4371 u64 offset, u64 length, void *buf)
4372
4373{
4374 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4375 struct rbd_obj_request *obj_request;
4376 struct page **pages = NULL;
4377 u32 page_count;
4378 size_t size;
4379 int ret;
4380
4381 page_count = (u32) calc_pages_for(offset, length);
4382 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4383 if (IS_ERR(pages))
4384 return PTR_ERR(pages);
4385
4386 ret = -ENOMEM;
4387 obj_request = rbd_obj_request_create(object_name, offset, length,
4388 OBJ_REQUEST_PAGES);
4389 if (!obj_request)
4390 goto out;
4391
4392 obj_request->pages = pages;
4393 obj_request->page_count = page_count;
4394
4395 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4396 obj_request);
4397 if (!obj_request->osd_req)
4398 goto out;
4399
4400 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4401 offset, length, 0, 0);
4402 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
4403 obj_request->pages,
4404 obj_request->length,
4405 obj_request->offset & ~PAGE_MASK,
4406 false, false);
4407 rbd_osd_req_format_read(obj_request);
4408
4409 ret = rbd_obj_request_submit(osdc, obj_request);
4410 if (ret)
4411 goto out;
4412 ret = rbd_obj_request_wait(obj_request);
4413 if (ret)
4414 goto out;
4415
4416 ret = obj_request->result;
4417 if (ret < 0)
4418 goto out;
4419
4420 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4421 size = (size_t) obj_request->xferred;
4422 ceph_copy_from_page_vector(pages, buf, 0, size);
4423 rbd_assert(size <= (size_t)INT_MAX);
4424 ret = (int)size;
4425out:
4426 if (obj_request)
4427 rbd_obj_request_put(obj_request);
4428 else
4429 ceph_release_page_vector(pages, page_count);
4430
4431 return ret;
4432}
4433
4434
4435
4436
4437
4438
4439static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4440{
4441 struct rbd_image_header_ondisk *ondisk = NULL;
4442 u32 snap_count = 0;
4443 u64 names_size = 0;
4444 u32 want_count;
4445 int ret;
4446
4447
4448
4449
4450
4451
4452
4453
4454 do {
4455 size_t size;
4456
4457 kfree(ondisk);
4458
4459 size = sizeof (*ondisk);
4460 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4461 size += names_size;
4462 ondisk = kmalloc(size, GFP_KERNEL);
4463 if (!ondisk)
4464 return -ENOMEM;
4465
4466 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
4467 0, size, ondisk);
4468 if (ret < 0)
4469 goto out;
4470 if ((size_t)ret < size) {
4471 ret = -ENXIO;
4472 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4473 size, ret);
4474 goto out;
4475 }
4476 if (!rbd_dev_ondisk_valid(ondisk)) {
4477 ret = -ENXIO;
4478 rbd_warn(rbd_dev, "invalid header");
4479 goto out;
4480 }
4481
4482 names_size = le64_to_cpu(ondisk->snap_names_len);
4483 want_count = snap_count;
4484 snap_count = le32_to_cpu(ondisk->snap_count);
4485 } while (snap_count != want_count);
4486
4487 ret = rbd_header_from_disk(rbd_dev, ondisk);
4488out:
4489 kfree(ondisk);
4490
4491 return ret;
4492}
4493
4494
4495
4496
4497
4498static void rbd_exists_validate(struct rbd_device *rbd_dev)
4499{
4500 u64 snap_id;
4501
4502 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4503 return;
4504
4505 snap_id = rbd_dev->spec->snap_id;
4506 if (snap_id == CEPH_NOSNAP)
4507 return;
4508
4509 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4510 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4511}
4512
4513static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4514{
4515 sector_t size;
4516
4517
4518
4519
4520
4521
4522 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4523 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4524 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4525 dout("setting size to %llu sectors", (unsigned long long)size);
4526 set_capacity(rbd_dev->disk, size);
4527 revalidate_disk(rbd_dev->disk);
4528 }
4529}
4530
4531static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4532{
4533 u64 mapping_size;
4534 int ret;
4535
4536 down_write(&rbd_dev->header_rwsem);
4537 mapping_size = rbd_dev->mapping.size;
4538
4539 ret = rbd_dev_header_info(rbd_dev);
4540 if (ret)
4541 goto out;
4542
4543
4544
4545
4546
4547 if (rbd_dev->parent) {
4548 ret = rbd_dev_v2_parent_info(rbd_dev);
4549 if (ret)
4550 goto out;
4551 }
4552
4553 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4554 rbd_dev->mapping.size = rbd_dev->header.image_size;
4555 } else {
4556
4557 rbd_exists_validate(rbd_dev);
4558 }
4559
4560out:
4561 up_write(&rbd_dev->header_rwsem);
4562 if (!ret && mapping_size != rbd_dev->mapping.size)
4563 rbd_dev_update_size(rbd_dev);
4564
4565 return ret;
4566}
4567
4568static int rbd_init_request(void *data, struct request *rq,
4569 unsigned int hctx_idx, unsigned int request_idx,
4570 unsigned int numa_node)
4571{
4572 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4573
4574 INIT_WORK(work, rbd_queue_workfn);
4575 return 0;
4576}
4577
4578static struct blk_mq_ops rbd_mq_ops = {
4579 .queue_rq = rbd_queue_rq,
4580 .map_queue = blk_mq_map_queue,
4581 .init_request = rbd_init_request,
4582};
4583
4584static int rbd_init_disk(struct rbd_device *rbd_dev)
4585{
4586 struct gendisk *disk;
4587 struct request_queue *q;
4588 u64 segment_size;
4589 int err;
4590
4591
4592 disk = alloc_disk(single_major ?
4593 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4594 RBD_MINORS_PER_MAJOR);
4595 if (!disk)
4596 return -ENOMEM;
4597
4598 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4599 rbd_dev->dev_id);
4600 disk->major = rbd_dev->major;
4601 disk->first_minor = rbd_dev->minor;
4602 if (single_major)
4603 disk->flags |= GENHD_FL_EXT_DEVT;
4604 disk->fops = &rbd_bd_ops;
4605 disk->private_data = rbd_dev;
4606
4607 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4608 rbd_dev->tag_set.ops = &rbd_mq_ops;
4609 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4610 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4611 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4612 rbd_dev->tag_set.nr_hw_queues = 1;
4613 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4614
4615 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4616 if (err)
4617 goto out_disk;
4618
4619 q = blk_mq_init_queue(&rbd_dev->tag_set);
4620 if (IS_ERR(q)) {
4621 err = PTR_ERR(q);
4622 goto out_tag_set;
4623 }
4624
4625 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4626
4627
4628
4629 segment_size = rbd_obj_bytes(&rbd_dev->header);
4630 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4631 q->limits.max_sectors = queue_max_hw_sectors(q);
4632 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4633 blk_queue_max_segment_size(q, segment_size);
4634 blk_queue_io_min(q, segment_size);
4635 blk_queue_io_opt(q, segment_size);
4636
4637
4638 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4639 q->limits.discard_granularity = segment_size;
4640 q->limits.discard_alignment = segment_size;
4641 q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
4642 q->limits.discard_zeroes_data = 1;
4643
4644 blk_queue_merge_bvec(q, rbd_merge_bvec);
4645
4646 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4647 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4648
4649 disk->queue = q;
4650
4651 q->queuedata = rbd_dev;
4652
4653 rbd_dev->disk = disk;
4654
4655 return 0;
4656out_tag_set:
4657 blk_mq_free_tag_set(&rbd_dev->tag_set);
4658out_disk:
4659 put_disk(disk);
4660 return err;
4661}
4662
4663
4664
4665
4666
4667static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4668{
4669 return container_of(dev, struct rbd_device, dev);
4670}
4671
4672static ssize_t rbd_size_show(struct device *dev,
4673 struct device_attribute *attr, char *buf)
4674{
4675 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4676
4677 return sprintf(buf, "%llu\n",
4678 (unsigned long long)rbd_dev->mapping.size);
4679}
4680
4681
4682
4683
4684
4685static ssize_t rbd_features_show(struct device *dev,
4686 struct device_attribute *attr, char *buf)
4687{
4688 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4689
4690 return sprintf(buf, "0x%016llx\n",
4691 (unsigned long long)rbd_dev->mapping.features);
4692}
4693
4694static ssize_t rbd_major_show(struct device *dev,
4695 struct device_attribute *attr, char *buf)
4696{
4697 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4698
4699 if (rbd_dev->major)
4700 return sprintf(buf, "%d\n", rbd_dev->major);
4701
4702 return sprintf(buf, "(none)\n");
4703}
4704
4705static ssize_t rbd_minor_show(struct device *dev,
4706 struct device_attribute *attr, char *buf)
4707{
4708 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4709
4710 return sprintf(buf, "%d\n", rbd_dev->minor);
4711}
4712
4713static ssize_t rbd_client_addr_show(struct device *dev,
4714 struct device_attribute *attr, char *buf)
4715{
4716 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4717 struct ceph_entity_addr *client_addr =
4718 ceph_client_addr(rbd_dev->rbd_client->client);
4719
4720 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4721 le32_to_cpu(client_addr->nonce));
4722}
4723
4724static ssize_t rbd_client_id_show(struct device *dev,
4725 struct device_attribute *attr, char *buf)
4726{
4727 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4728
4729 return sprintf(buf, "client%lld\n",
4730 ceph_client_gid(rbd_dev->rbd_client->client));
4731}
4732
4733static ssize_t rbd_cluster_fsid_show(struct device *dev,
4734 struct device_attribute *attr, char *buf)
4735{
4736 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4737
4738 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4739}
4740
4741static ssize_t rbd_config_info_show(struct device *dev,
4742 struct device_attribute *attr, char *buf)
4743{
4744 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4745
4746 return sprintf(buf, "%s\n", rbd_dev->config_info);
4747}
4748
4749static ssize_t rbd_pool_show(struct device *dev,
4750 struct device_attribute *attr, char *buf)
4751{
4752 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4753
4754 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4755}
4756
4757static ssize_t rbd_pool_id_show(struct device *dev,
4758 struct device_attribute *attr, char *buf)
4759{
4760 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4761
4762 return sprintf(buf, "%llu\n",
4763 (unsigned long long) rbd_dev->spec->pool_id);
4764}
4765
4766static ssize_t rbd_name_show(struct device *dev,
4767 struct device_attribute *attr, char *buf)
4768{
4769 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4770
4771 if (rbd_dev->spec->image_name)
4772 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4773
4774 return sprintf(buf, "(unknown)\n");
4775}
4776
4777static ssize_t rbd_image_id_show(struct device *dev,
4778 struct device_attribute *attr, char *buf)
4779{
4780 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4781
4782 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4783}
4784
4785
4786
4787
4788
4789static ssize_t rbd_snap_show(struct device *dev,
4790 struct device_attribute *attr,
4791 char *buf)
4792{
4793 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4794
4795 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4796}
4797
4798static ssize_t rbd_snap_id_show(struct device *dev,
4799 struct device_attribute *attr, char *buf)
4800{
4801 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4802
4803 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4804}
4805
4806
4807
4808
4809
4810
4811static ssize_t rbd_parent_show(struct device *dev,
4812 struct device_attribute *attr,
4813 char *buf)
4814{
4815 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4816 ssize_t count = 0;
4817
4818 if (!rbd_dev->parent)
4819 return sprintf(buf, "(no parent image)\n");
4820
4821 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4822 struct rbd_spec *spec = rbd_dev->parent_spec;
4823
4824 count += sprintf(&buf[count], "%s"
4825 "pool_id %llu\npool_name %s\n"
4826 "image_id %s\nimage_name %s\n"
4827 "snap_id %llu\nsnap_name %s\n"
4828 "overlap %llu\n",
4829 !count ? "" : "\n",
4830 spec->pool_id, spec->pool_name,
4831 spec->image_id, spec->image_name ?: "(unknown)",
4832 spec->snap_id, spec->snap_name,
4833 rbd_dev->parent_overlap);
4834 }
4835
4836 return count;
4837}
4838
4839static ssize_t rbd_image_refresh(struct device *dev,
4840 struct device_attribute *attr,
4841 const char *buf,
4842 size_t size)
4843{
4844 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4845 int ret;
4846
4847 ret = rbd_dev_refresh(rbd_dev);
4848 if (ret)
4849 return ret;
4850
4851 return size;
4852}
4853
4854static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4855static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4856static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4857static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4858static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4859static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4860static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4861static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4862static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4863static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4864static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4865static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4866static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4867static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4868static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4869static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4870
4871static struct attribute *rbd_attrs[] = {
4872 &dev_attr_size.attr,
4873 &dev_attr_features.attr,
4874 &dev_attr_major.attr,
4875 &dev_attr_minor.attr,
4876 &dev_attr_client_addr.attr,
4877 &dev_attr_client_id.attr,
4878 &dev_attr_cluster_fsid.attr,
4879 &dev_attr_config_info.attr,
4880 &dev_attr_pool.attr,
4881 &dev_attr_pool_id.attr,
4882 &dev_attr_name.attr,
4883 &dev_attr_image_id.attr,
4884 &dev_attr_current_snap.attr,
4885 &dev_attr_snap_id.attr,
4886 &dev_attr_parent.attr,
4887 &dev_attr_refresh.attr,
4888 NULL
4889};
4890
4891static struct attribute_group rbd_attr_group = {
4892 .attrs = rbd_attrs,
4893};
4894
4895static const struct attribute_group *rbd_attr_groups[] = {
4896 &rbd_attr_group,
4897 NULL
4898};
4899
4900static void rbd_dev_release(struct device *dev);
4901
4902static struct device_type rbd_device_type = {
4903 .name = "rbd",
4904 .groups = rbd_attr_groups,
4905 .release = rbd_dev_release,
4906};
4907
4908static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4909{
4910 kref_get(&spec->kref);
4911
4912 return spec;
4913}
4914
4915static void rbd_spec_free(struct kref *kref);
4916static void rbd_spec_put(struct rbd_spec *spec)
4917{
4918 if (spec)
4919 kref_put(&spec->kref, rbd_spec_free);
4920}
4921
4922static struct rbd_spec *rbd_spec_alloc(void)
4923{
4924 struct rbd_spec *spec;
4925
4926 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4927 if (!spec)
4928 return NULL;
4929
4930 spec->pool_id = CEPH_NOPOOL;
4931 spec->snap_id = CEPH_NOSNAP;
4932 kref_init(&spec->kref);
4933
4934 return spec;
4935}
4936
4937static void rbd_spec_free(struct kref *kref)
4938{
4939 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4940
4941 kfree(spec->pool_name);
4942 kfree(spec->image_id);
4943 kfree(spec->image_name);
4944 kfree(spec->snap_name);
4945 kfree(spec);
4946}
4947
4948static void rbd_dev_free(struct rbd_device *rbd_dev)
4949{
4950 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4951 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4952
4953 ceph_oid_destroy(&rbd_dev->header_oid);
4954 kfree(rbd_dev->config_info);
4955
4956 rbd_put_client(rbd_dev->rbd_client);
4957 rbd_spec_put(rbd_dev->spec);
4958 kfree(rbd_dev->opts);
4959 kfree(rbd_dev);
4960}
4961
4962static void rbd_dev_release(struct device *dev)
4963{
4964 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4965 bool need_put = !!rbd_dev->opts;
4966
4967 if (need_put) {
4968 destroy_workqueue(rbd_dev->task_wq);
4969 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4970 }
4971
4972 rbd_dev_free(rbd_dev);
4973
4974
4975
4976
4977
4978
4979 if (need_put)
4980 module_put(THIS_MODULE);
4981}
4982
4983static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4984 struct rbd_spec *spec)
4985{
4986 struct rbd_device *rbd_dev;
4987
4988 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4989 if (!rbd_dev)
4990 return NULL;
4991
4992 spin_lock_init(&rbd_dev->lock);
4993 INIT_LIST_HEAD(&rbd_dev->node);
4994 init_rwsem(&rbd_dev->header_rwsem);
4995
4996 ceph_oid_init(&rbd_dev->header_oid);
4997 ceph_oloc_init(&rbd_dev->header_oloc);
4998
4999 mutex_init(&rbd_dev->watch_mutex);
5000 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5001 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5002
5003 init_rwsem(&rbd_dev->lock_rwsem);
5004 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5005 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5006 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5007 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5008 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5009 init_waitqueue_head(&rbd_dev->lock_waitq);
5010
5011 rbd_dev->dev.bus = &rbd_bus_type;
5012 rbd_dev->dev.type = &rbd_device_type;
5013 rbd_dev->dev.parent = &rbd_root_dev;
5014 device_initialize(&rbd_dev->dev);
5015
5016 rbd_dev->rbd_client = rbdc;
5017 rbd_dev->spec = spec;
5018
5019 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
5020 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
5021 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
5022 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
5023
5024 return rbd_dev;
5025}
5026
5027
5028
5029
5030static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5031 struct rbd_spec *spec,
5032 struct rbd_options *opts)
5033{
5034 struct rbd_device *rbd_dev;
5035
5036 rbd_dev = __rbd_dev_create(rbdc, spec);
5037 if (!rbd_dev)
5038 return NULL;
5039
5040 rbd_dev->opts = opts;
5041
5042
5043 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5044 minor_to_rbd_dev_id(1 << MINORBITS),
5045 GFP_KERNEL);
5046 if (rbd_dev->dev_id < 0)
5047 goto fail_rbd_dev;
5048
5049 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5050 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5051 rbd_dev->name);
5052 if (!rbd_dev->task_wq)
5053 goto fail_dev_id;
5054
5055
5056 __module_get(THIS_MODULE);
5057
5058 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5059 return rbd_dev;
5060
5061fail_dev_id:
5062 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5063fail_rbd_dev:
5064 rbd_dev_free(rbd_dev);
5065 return NULL;
5066}
5067
5068static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5069{
5070 if (rbd_dev)
5071 put_device(&rbd_dev->dev);
5072}
5073
5074
5075
5076
5077
5078
5079static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5080 u8 *order, u64 *snap_size)
5081{
5082 __le64 snapid = cpu_to_le64(snap_id);
5083 int ret;
5084 struct {
5085 u8 order;
5086 __le64 size;
5087 } __attribute__ ((packed)) size_buf = { 0 };
5088
5089 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5090 "rbd", "get_size",
5091 &snapid, sizeof (snapid),
5092 &size_buf, sizeof (size_buf));
5093 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5094 if (ret < 0)
5095 return ret;
5096 if (ret < sizeof (size_buf))
5097 return -ERANGE;
5098
5099 if (order) {
5100 *order = size_buf.order;
5101 dout(" order %u", (unsigned int)*order);
5102 }
5103 *snap_size = le64_to_cpu(size_buf.size);
5104
5105 dout(" snap_id 0x%016llx snap_size = %llu\n",
5106 (unsigned long long)snap_id,
5107 (unsigned long long)*snap_size);
5108
5109 return 0;
5110}
5111
5112static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5113{
5114 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5115 &rbd_dev->header.obj_order,
5116 &rbd_dev->header.image_size);
5117}
5118
5119static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5120{
5121 void *reply_buf;
5122 int ret;
5123 void *p;
5124
5125 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5126 if (!reply_buf)
5127 return -ENOMEM;
5128
5129 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5130 "rbd", "get_object_prefix", NULL, 0,
5131 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5132 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5133 if (ret < 0)
5134 goto out;
5135
5136 p = reply_buf;
5137 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5138 p + ret, NULL, GFP_NOIO);
5139 ret = 0;
5140
5141 if (IS_ERR(rbd_dev->header.object_prefix)) {
5142 ret = PTR_ERR(rbd_dev->header.object_prefix);
5143 rbd_dev->header.object_prefix = NULL;
5144 } else {
5145 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5146 }
5147out:
5148 kfree(reply_buf);
5149
5150 return ret;
5151}
5152
5153static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5154 u64 *snap_features)
5155{
5156 __le64 snapid = cpu_to_le64(snap_id);
5157 struct {
5158 __le64 features;
5159 __le64 incompat;
5160 } __attribute__ ((packed)) features_buf = { 0 };
5161 u64 unsup;
5162 int ret;
5163
5164 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5165 "rbd", "get_features",
5166 &snapid, sizeof (snapid),
5167 &features_buf, sizeof (features_buf));
5168 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5169 if (ret < 0)
5170 return ret;
5171 if (ret < sizeof (features_buf))
5172 return -ERANGE;
5173
5174 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5175 if (unsup) {
5176 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5177 unsup);
5178 return -ENXIO;
5179 }
5180
5181 *snap_features = le64_to_cpu(features_buf.features);
5182
5183 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5184 (unsigned long long)snap_id,
5185 (unsigned long long)*snap_features,
5186 (unsigned long long)le64_to_cpu(features_buf.incompat));
5187
5188 return 0;
5189}
5190
5191static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5192{
5193 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5194 &rbd_dev->header.features);
5195}
5196
5197static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5198{
5199 struct rbd_spec *parent_spec;
5200 size_t size;
5201 void *reply_buf = NULL;
5202 __le64 snapid;
5203 void *p;
5204 void *end;
5205 u64 pool_id;
5206 char *image_id;
5207 u64 snap_id;
5208 u64 overlap;
5209 int ret;
5210
5211 parent_spec = rbd_spec_alloc();
5212 if (!parent_spec)
5213 return -ENOMEM;
5214
5215 size = sizeof (__le64) +
5216 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +
5217 sizeof (__le64) +
5218 sizeof (__le64);
5219 reply_buf = kmalloc(size, GFP_KERNEL);
5220 if (!reply_buf) {
5221 ret = -ENOMEM;
5222 goto out_err;
5223 }
5224
5225 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5226 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5227 "rbd", "get_parent",
5228 &snapid, sizeof (snapid),
5229 reply_buf, size);
5230 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5231 if (ret < 0)
5232 goto out_err;
5233
5234 p = reply_buf;
5235 end = reply_buf + ret;
5236 ret = -ERANGE;
5237 ceph_decode_64_safe(&p, end, pool_id, out_err);
5238 if (pool_id == CEPH_NOPOOL) {
5239
5240
5241
5242
5243
5244
5245
5246
5247
5248 if (rbd_dev->parent_overlap) {
5249 rbd_dev->parent_overlap = 0;
5250 rbd_dev_parent_put(rbd_dev);
5251 pr_info("%s: clone image has been flattened\n",
5252 rbd_dev->disk->disk_name);
5253 }
5254
5255 goto out;
5256 }
5257
5258
5259
5260 ret = -EIO;
5261 if (pool_id > (u64)U32_MAX) {
5262 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5263 (unsigned long long)pool_id, U32_MAX);
5264 goto out_err;
5265 }
5266
5267 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5268 if (IS_ERR(image_id)) {
5269 ret = PTR_ERR(image_id);
5270 goto out_err;
5271 }
5272 ceph_decode_64_safe(&p, end, snap_id, out_err);
5273 ceph_decode_64_safe(&p, end, overlap, out_err);
5274
5275
5276
5277
5278
5279
5280 if (!rbd_dev->parent_spec) {
5281 parent_spec->pool_id = pool_id;
5282 parent_spec->image_id = image_id;
5283 parent_spec->snap_id = snap_id;
5284 rbd_dev->parent_spec = parent_spec;
5285 parent_spec = NULL;
5286 } else {
5287 kfree(image_id);
5288 }
5289
5290
5291
5292
5293
5294 if (!overlap) {
5295 if (parent_spec) {
5296
5297 if (rbd_dev->parent_overlap)
5298 rbd_warn(rbd_dev,
5299 "clone now standalone (overlap became 0)");
5300 } else {
5301
5302 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5303 }
5304 }
5305 rbd_dev->parent_overlap = overlap;
5306
5307out:
5308 ret = 0;
5309out_err:
5310 kfree(reply_buf);
5311 rbd_spec_put(parent_spec);
5312
5313 return ret;
5314}
5315
5316static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5317{
5318 struct {
5319 __le64 stripe_unit;
5320 __le64 stripe_count;
5321 } __attribute__ ((packed)) striping_info_buf = { 0 };
5322 size_t size = sizeof (striping_info_buf);
5323 void *p;
5324 u64 obj_size;
5325 u64 stripe_unit;
5326 u64 stripe_count;
5327 int ret;
5328
5329 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5330 "rbd", "get_stripe_unit_count", NULL, 0,
5331 (char *)&striping_info_buf, size);
5332 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5333 if (ret < 0)
5334 return ret;
5335 if (ret < size)
5336 return -ERANGE;
5337
5338
5339
5340
5341
5342
5343
5344 ret = -EINVAL;
5345 obj_size = (u64)1 << rbd_dev->header.obj_order;
5346 p = &striping_info_buf;
5347 stripe_unit = ceph_decode_64(&p);
5348 if (stripe_unit != obj_size) {
5349 rbd_warn(rbd_dev, "unsupported stripe unit "
5350 "(got %llu want %llu)",
5351 stripe_unit, obj_size);
5352 return -EINVAL;
5353 }
5354 stripe_count = ceph_decode_64(&p);
5355 if (stripe_count != 1) {
5356 rbd_warn(rbd_dev, "unsupported stripe count "
5357 "(got %llu want 1)", stripe_count);
5358 return -EINVAL;
5359 }
5360 rbd_dev->header.stripe_unit = stripe_unit;
5361 rbd_dev->header.stripe_count = stripe_count;
5362
5363 return 0;
5364}
5365
5366static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5367{
5368 size_t image_id_size;
5369 char *image_id;
5370 void *p;
5371 void *end;
5372 size_t size;
5373 void *reply_buf = NULL;
5374 size_t len = 0;
5375 char *image_name = NULL;
5376 int ret;
5377
5378 rbd_assert(!rbd_dev->spec->image_name);
5379
5380 len = strlen(rbd_dev->spec->image_id);
5381 image_id_size = sizeof (__le32) + len;
5382 image_id = kmalloc(image_id_size, GFP_KERNEL);
5383 if (!image_id)
5384 return NULL;
5385
5386 p = image_id;
5387 end = image_id + image_id_size;
5388 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5389
5390 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5391 reply_buf = kmalloc(size, GFP_KERNEL);
5392 if (!reply_buf)
5393 goto out;
5394
5395 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
5396 "rbd", "dir_get_name",
5397 image_id, image_id_size,
5398 reply_buf, size);
5399 if (ret < 0)
5400 goto out;
5401 p = reply_buf;
5402 end = reply_buf + ret;
5403
5404 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5405 if (IS_ERR(image_name))
5406 image_name = NULL;
5407 else
5408 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5409out:
5410 kfree(reply_buf);
5411 kfree(image_id);
5412
5413 return image_name;
5414}
5415
5416static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5417{
5418 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5419 const char *snap_name;
5420 u32 which = 0;
5421
5422
5423
5424 snap_name = rbd_dev->header.snap_names;
5425 while (which < snapc->num_snaps) {
5426 if (!strcmp(name, snap_name))
5427 return snapc->snaps[which];
5428 snap_name += strlen(snap_name) + 1;
5429 which++;
5430 }
5431 return CEPH_NOSNAP;
5432}
5433
5434static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5435{
5436 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5437 u32 which;
5438 bool found = false;
5439 u64 snap_id;
5440
5441 for (which = 0; !found && which < snapc->num_snaps; which++) {
5442 const char *snap_name;
5443
5444 snap_id = snapc->snaps[which];
5445 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5446 if (IS_ERR(snap_name)) {
5447
5448 if (PTR_ERR(snap_name) == -ENOENT)
5449 continue;
5450 else
5451 break;
5452 }
5453 found = !strcmp(name, snap_name);
5454 kfree(snap_name);
5455 }
5456 return found ? snap_id : CEPH_NOSNAP;
5457}
5458
5459
5460
5461
5462
5463static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5464{
5465 if (rbd_dev->image_format == 1)
5466 return rbd_v1_snap_id_by_name(rbd_dev, name);
5467
5468 return rbd_v2_snap_id_by_name(rbd_dev, name);
5469}
5470
5471
5472
5473
5474static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5475{
5476 struct rbd_spec *spec = rbd_dev->spec;
5477
5478 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5479 rbd_assert(spec->image_id && spec->image_name);
5480 rbd_assert(spec->snap_name);
5481
5482 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5483 u64 snap_id;
5484
5485 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5486 if (snap_id == CEPH_NOSNAP)
5487 return -ENOENT;
5488
5489 spec->snap_id = snap_id;
5490 } else {
5491 spec->snap_id = CEPH_NOSNAP;
5492 }
5493
5494 return 0;
5495}
5496
5497
5498
5499
5500
5501
5502
5503static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5504{
5505 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5506 struct rbd_spec *spec = rbd_dev->spec;
5507 const char *pool_name;
5508 const char *image_name;
5509 const char *snap_name;
5510 int ret;
5511
5512 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5513 rbd_assert(spec->image_id);
5514 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5515
5516
5517
5518 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5519 if (!pool_name) {
5520 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5521 return -EIO;
5522 }
5523 pool_name = kstrdup(pool_name, GFP_KERNEL);
5524 if (!pool_name)
5525 return -ENOMEM;
5526
5527
5528
5529 image_name = rbd_dev_image_name(rbd_dev);
5530 if (!image_name)
5531 rbd_warn(rbd_dev, "unable to get image name");
5532
5533
5534
5535 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5536 if (IS_ERR(snap_name)) {
5537 ret = PTR_ERR(snap_name);
5538 goto out_err;
5539 }
5540
5541 spec->pool_name = pool_name;
5542 spec->image_name = image_name;
5543 spec->snap_name = snap_name;
5544
5545 return 0;
5546
5547out_err:
5548 kfree(image_name);
5549 kfree(pool_name);
5550 return ret;
5551}
5552
5553static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5554{
5555 size_t size;
5556 int ret;
5557 void *reply_buf;
5558 void *p;
5559 void *end;
5560 u64 seq;
5561 u32 snap_count;
5562 struct ceph_snap_context *snapc;
5563 u32 i;
5564
5565
5566
5567
5568
5569
5570
5571 size = sizeof (__le64) + sizeof (__le32) +
5572 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5573 reply_buf = kzalloc(size, GFP_KERNEL);
5574 if (!reply_buf)
5575 return -ENOMEM;
5576
5577 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5578 "rbd", "get_snapcontext", NULL, 0,
5579 reply_buf, size);
5580 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5581 if (ret < 0)
5582 goto out;
5583
5584 p = reply_buf;
5585 end = reply_buf + ret;
5586 ret = -ERANGE;
5587 ceph_decode_64_safe(&p, end, seq, out);
5588 ceph_decode_32_safe(&p, end, snap_count, out);
5589
5590
5591
5592
5593
5594
5595
5596 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5597 / sizeof (u64)) {
5598 ret = -EINVAL;
5599 goto out;
5600 }
5601 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5602 goto out;
5603 ret = 0;
5604
5605 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5606 if (!snapc) {
5607 ret = -ENOMEM;
5608 goto out;
5609 }
5610 snapc->seq = seq;
5611 for (i = 0; i < snap_count; i++)
5612 snapc->snaps[i] = ceph_decode_64(&p);
5613
5614 ceph_put_snap_context(rbd_dev->header.snapc);
5615 rbd_dev->header.snapc = snapc;
5616
5617 dout(" snap context seq = %llu, snap_count = %u\n",
5618 (unsigned long long)seq, (unsigned int)snap_count);
5619out:
5620 kfree(reply_buf);
5621
5622 return ret;
5623}
5624
5625static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5626 u64 snap_id)
5627{
5628 size_t size;
5629 void *reply_buf;
5630 __le64 snapid;
5631 int ret;
5632 void *p;
5633 void *end;
5634 char *snap_name;
5635
5636 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5637 reply_buf = kmalloc(size, GFP_KERNEL);
5638 if (!reply_buf)
5639 return ERR_PTR(-ENOMEM);
5640
5641 snapid = cpu_to_le64(snap_id);
5642 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5643 "rbd", "get_snapshot_name",
5644 &snapid, sizeof (snapid),
5645 reply_buf, size);
5646 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5647 if (ret < 0) {
5648 snap_name = ERR_PTR(ret);
5649 goto out;
5650 }
5651
5652 p = reply_buf;
5653 end = reply_buf + ret;
5654 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5655 if (IS_ERR(snap_name))
5656 goto out;
5657
5658 dout(" snap_id 0x%016llx snap_name = %s\n",
5659 (unsigned long long)snap_id, snap_name);
5660out:
5661 kfree(reply_buf);
5662
5663 return snap_name;
5664}
5665
5666static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5667{
5668 bool first_time = rbd_dev->header.object_prefix == NULL;
5669 int ret;
5670
5671 ret = rbd_dev_v2_image_size(rbd_dev);
5672 if (ret)
5673 return ret;
5674
5675 if (first_time) {
5676 ret = rbd_dev_v2_header_onetime(rbd_dev);
5677 if (ret)
5678 return ret;
5679 }
5680
5681 ret = rbd_dev_v2_snap_context(rbd_dev);
5682 if (ret && first_time) {
5683 kfree(rbd_dev->header.object_prefix);
5684 rbd_dev->header.object_prefix = NULL;
5685 }
5686
5687 return ret;
5688}
5689
5690static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5691{
5692 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5693
5694 if (rbd_dev->image_format == 1)
5695 return rbd_dev_v1_header_info(rbd_dev);
5696
5697 return rbd_dev_v2_header_info(rbd_dev);
5698}
5699
5700
5701
5702
5703
5704
5705
5706static inline size_t next_token(const char **buf)
5707{
5708
5709
5710
5711
5712 const char *spaces = " \f\n\r\t\v";
5713
5714 *buf += strspn(*buf, spaces);
5715
5716 return strcspn(*buf, spaces);
5717}
5718
5719
5720
5721
5722
5723
5724
5725
5726
5727
5728
5729
5730
5731
5732
5733
5734
5735static inline char *dup_token(const char **buf, size_t *lenp)
5736{
5737 char *dup;
5738 size_t len;
5739
5740 len = next_token(buf);
5741 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5742 if (!dup)
5743 return NULL;
5744 *(dup + len) = '\0';
5745 *buf += len;
5746
5747 if (lenp)
5748 *lenp = len;
5749
5750 return dup;
5751}
5752
5753
5754
5755
5756
5757
5758
5759
5760
5761
5762
5763
5764
5765
5766
5767
5768
5769
5770
5771
5772
5773
5774
5775
5776
5777
5778
5779
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791
5792
5793
5794static int rbd_add_parse_args(const char *buf,
5795 struct ceph_options **ceph_opts,
5796 struct rbd_options **opts,
5797 struct rbd_spec **rbd_spec)
5798{
5799 size_t len;
5800 char *options;
5801 const char *mon_addrs;
5802 char *snap_name;
5803 size_t mon_addrs_size;
5804 struct rbd_spec *spec = NULL;
5805 struct rbd_options *rbd_opts = NULL;
5806 struct ceph_options *copts;
5807 int ret;
5808
5809
5810
5811 len = next_token(&buf);
5812 if (!len) {
5813 rbd_warn(NULL, "no monitor address(es) provided");
5814 return -EINVAL;
5815 }
5816 mon_addrs = buf;
5817 mon_addrs_size = len + 1;
5818 buf += len;
5819
5820 ret = -EINVAL;
5821 options = dup_token(&buf, NULL);
5822 if (!options)
5823 return -ENOMEM;
5824 if (!*options) {
5825 rbd_warn(NULL, "no options provided");
5826 goto out_err;
5827 }
5828
5829 spec = rbd_spec_alloc();
5830 if (!spec)
5831 goto out_mem;
5832
5833 spec->pool_name = dup_token(&buf, NULL);
5834 if (!spec->pool_name)
5835 goto out_mem;
5836 if (!*spec->pool_name) {
5837 rbd_warn(NULL, "no pool name provided");
5838 goto out_err;
5839 }
5840
5841 spec->image_name = dup_token(&buf, NULL);
5842 if (!spec->image_name)
5843 goto out_mem;
5844 if (!*spec->image_name) {
5845 rbd_warn(NULL, "no image name provided");
5846 goto out_err;
5847 }
5848
5849
5850
5851
5852
5853 len = next_token(&buf);
5854 if (!len) {
5855 buf = RBD_SNAP_HEAD_NAME;
5856 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5857 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5858 ret = -ENAMETOOLONG;
5859 goto out_err;
5860 }
5861 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5862 if (!snap_name)
5863 goto out_mem;
5864 *(snap_name + len) = '\0';
5865 spec->snap_name = snap_name;
5866
5867
5868
5869 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5870 if (!rbd_opts)
5871 goto out_mem;
5872
5873 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5874 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5875 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5876
5877 copts = ceph_parse_options(options, mon_addrs,
5878 mon_addrs + mon_addrs_size - 1,
5879 parse_rbd_opts_token, rbd_opts);
5880 if (IS_ERR(copts)) {
5881 ret = PTR_ERR(copts);
5882 goto out_err;
5883 }
5884 kfree(options);
5885
5886 *ceph_opts = copts;
5887 *opts = rbd_opts;
5888 *rbd_spec = spec;
5889
5890 return 0;
5891out_mem:
5892 ret = -ENOMEM;
5893out_err:
5894 kfree(rbd_opts);
5895 rbd_spec_put(spec);
5896 kfree(options);
5897
5898 return ret;
5899}
5900
5901
5902
5903
5904static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5905{
5906 struct ceph_options *opts = rbdc->client->options;
5907 u64 newest_epoch;
5908 int tries = 0;
5909 int ret;
5910
5911again:
5912 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5913 if (ret == -ENOENT && tries++ < 1) {
5914 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5915 &newest_epoch);
5916 if (ret < 0)
5917 return ret;
5918
5919 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5920 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5921 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5922 newest_epoch,
5923 opts->mount_timeout);
5924 goto again;
5925 } else {
5926
5927 return -ENOENT;
5928 }
5929 }
5930
5931 return ret;
5932}
5933
5934
5935
5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
5946
5947
5948static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5949{
5950 int ret;
5951 size_t size;
5952 char *object_name;
5953 void *response;
5954 char *image_id;
5955
5956
5957
5958
5959
5960
5961
5962 if (rbd_dev->spec->image_id) {
5963 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5964
5965 return 0;
5966 }
5967
5968
5969
5970
5971
5972 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5973 object_name = kmalloc(size, GFP_NOIO);
5974 if (!object_name)
5975 return -ENOMEM;
5976 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5977 dout("rbd id object name is %s\n", object_name);
5978
5979
5980
5981 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5982 response = kzalloc(size, GFP_NOIO);
5983 if (!response) {
5984 ret = -ENOMEM;
5985 goto out;
5986 }
5987
5988
5989
5990 ret = rbd_obj_method_sync(rbd_dev, object_name,
5991 "rbd", "get_id", NULL, 0,
5992 response, RBD_IMAGE_ID_LEN_MAX);
5993 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5994 if (ret == -ENOENT) {
5995 image_id = kstrdup("", GFP_KERNEL);
5996 ret = image_id ? 0 : -ENOMEM;
5997 if (!ret)
5998 rbd_dev->image_format = 1;
5999 } else if (ret >= 0) {
6000 void *p = response;
6001
6002 image_id = ceph_extract_encoded_string(&p, p + ret,
6003 NULL, GFP_NOIO);
6004 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
6005 if (!ret)
6006 rbd_dev->image_format = 2;
6007 }
6008
6009 if (!ret) {
6010 rbd_dev->spec->image_id = image_id;
6011 dout("image_id is %s\n", image_id);
6012 }
6013out:
6014 kfree(response);
6015 kfree(object_name);
6016
6017 return ret;
6018}
6019
6020
6021
6022
6023
6024static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6025{
6026 struct rbd_image_header *header;
6027
6028 rbd_dev_parent_put(rbd_dev);
6029
6030
6031
6032 header = &rbd_dev->header;
6033 ceph_put_snap_context(header->snapc);
6034 kfree(header->snap_sizes);
6035 kfree(header->snap_names);
6036 kfree(header->object_prefix);
6037 memset(header, 0, sizeof (*header));
6038}
6039
6040static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6041{
6042 int ret;
6043
6044 ret = rbd_dev_v2_object_prefix(rbd_dev);
6045 if (ret)
6046 goto out_err;
6047
6048
6049
6050
6051
6052 ret = rbd_dev_v2_features(rbd_dev);
6053 if (ret)
6054 goto out_err;
6055
6056
6057
6058 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6059 ret = rbd_dev_v2_striping_info(rbd_dev);
6060 if (ret < 0)
6061 goto out_err;
6062 }
6063
6064
6065 return 0;
6066out_err:
6067 rbd_dev->header.features = 0;
6068 kfree(rbd_dev->header.object_prefix);
6069 rbd_dev->header.object_prefix = NULL;
6070
6071 return ret;
6072}
6073
6074
6075
6076
6077
6078
6079static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6080{
6081 struct rbd_device *parent = NULL;
6082 int ret;
6083
6084 if (!rbd_dev->parent_spec)
6085 return 0;
6086
6087 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6088 pr_info("parent chain is too long (%d)\n", depth);
6089 ret = -EINVAL;
6090 goto out_err;
6091 }
6092
6093 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6094 if (!parent) {
6095 ret = -ENOMEM;
6096 goto out_err;
6097 }
6098
6099
6100
6101
6102
6103 __rbd_get_client(rbd_dev->rbd_client);
6104 rbd_spec_get(rbd_dev->parent_spec);
6105
6106 ret = rbd_dev_image_probe(parent, depth);
6107 if (ret < 0)
6108 goto out_err;
6109
6110 rbd_dev->parent = parent;
6111 atomic_set(&rbd_dev->parent_ref, 1);
6112 return 0;
6113
6114out_err:
6115 rbd_dev_unparent(rbd_dev);
6116 rbd_dev_destroy(parent);
6117 return ret;
6118}
6119
6120
6121
6122
6123
6124static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6125{
6126 int ret;
6127
6128
6129
6130 if (!single_major) {
6131 ret = register_blkdev(0, rbd_dev->name);
6132 if (ret < 0)
6133 goto err_out_unlock;
6134
6135 rbd_dev->major = ret;
6136 rbd_dev->minor = 0;
6137 } else {
6138 rbd_dev->major = rbd_major;
6139 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6140 }
6141
6142
6143
6144 ret = rbd_init_disk(rbd_dev);
6145 if (ret)
6146 goto err_out_blkdev;
6147
6148 ret = rbd_dev_mapping_set(rbd_dev);
6149 if (ret)
6150 goto err_out_disk;
6151
6152 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6153 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6154
6155 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6156 ret = device_add(&rbd_dev->dev);
6157 if (ret)
6158 goto err_out_mapping;
6159
6160
6161
6162 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6163 up_write(&rbd_dev->header_rwsem);
6164
6165 spin_lock(&rbd_dev_list_lock);
6166 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6167 spin_unlock(&rbd_dev_list_lock);
6168
6169 add_disk(rbd_dev->disk);
6170 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6171 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6172 rbd_dev->header.features);
6173
6174 return ret;
6175
6176err_out_mapping:
6177 rbd_dev_mapping_clear(rbd_dev);
6178err_out_disk:
6179 rbd_free_disk(rbd_dev);
6180err_out_blkdev:
6181 if (!single_major)
6182 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6183err_out_unlock:
6184 up_write(&rbd_dev->header_rwsem);
6185 return ret;
6186}
6187
6188static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6189{
6190 struct rbd_spec *spec = rbd_dev->spec;
6191 int ret;
6192
6193
6194
6195 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6196
6197 rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
6198 if (rbd_dev->image_format == 1)
6199 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6200 spec->image_name, RBD_SUFFIX);
6201 else
6202 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6203 RBD_HEADER_PREFIX, spec->image_id);
6204
6205 return ret;
6206}
6207
6208static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6209{
6210 rbd_dev_unprobe(rbd_dev);
6211 rbd_dev->image_format = 0;
6212 kfree(rbd_dev->spec->image_id);
6213 rbd_dev->spec->image_id = NULL;
6214
6215 rbd_dev_destroy(rbd_dev);
6216}
6217
6218
6219
6220
6221
6222
6223
6224static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6225{
6226 int ret;
6227
6228
6229
6230
6231
6232
6233
6234 ret = rbd_dev_image_id(rbd_dev);
6235 if (ret)
6236 return ret;
6237
6238 ret = rbd_dev_header_name(rbd_dev);
6239 if (ret)
6240 goto err_out_format;
6241
6242 if (!depth) {
6243 ret = rbd_register_watch(rbd_dev);
6244 if (ret) {
6245 if (ret == -ENOENT)
6246 pr_info("image %s/%s does not exist\n",
6247 rbd_dev->spec->pool_name,
6248 rbd_dev->spec->image_name);
6249 goto err_out_format;
6250 }
6251 }
6252
6253 ret = rbd_dev_header_info(rbd_dev);
6254 if (ret)
6255 goto err_out_watch;
6256
6257
6258
6259
6260
6261
6262
6263 if (!depth)
6264 ret = rbd_spec_fill_snap_id(rbd_dev);
6265 else
6266 ret = rbd_spec_fill_names(rbd_dev);
6267 if (ret) {
6268 if (ret == -ENOENT)
6269 pr_info("snap %s/%s@%s does not exist\n",
6270 rbd_dev->spec->pool_name,
6271 rbd_dev->spec->image_name,
6272 rbd_dev->spec->snap_name);
6273 goto err_out_probe;
6274 }
6275
6276 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6277 ret = rbd_dev_v2_parent_info(rbd_dev);
6278 if (ret)
6279 goto err_out_probe;
6280
6281
6282
6283
6284
6285 if (!depth && rbd_dev->parent_spec)
6286 rbd_warn(rbd_dev,
6287 "WARNING: kernel layering is EXPERIMENTAL!");
6288 }
6289
6290 ret = rbd_dev_probe_parent(rbd_dev, depth);
6291 if (ret)
6292 goto err_out_probe;
6293
6294 dout("discovered format %u image, header name is %s\n",
6295 rbd_dev->image_format, rbd_dev->header_oid.name);
6296 return 0;
6297
6298err_out_probe:
6299 rbd_dev_unprobe(rbd_dev);
6300err_out_watch:
6301 if (!depth)
6302 rbd_unregister_watch(rbd_dev);
6303err_out_format:
6304 rbd_dev->image_format = 0;
6305 kfree(rbd_dev->spec->image_id);
6306 rbd_dev->spec->image_id = NULL;
6307 return ret;
6308}
6309
6310static ssize_t do_rbd_add(struct bus_type *bus,
6311 const char *buf,
6312 size_t count)
6313{
6314 struct rbd_device *rbd_dev = NULL;
6315 struct ceph_options *ceph_opts = NULL;
6316 struct rbd_options *rbd_opts = NULL;
6317 struct rbd_spec *spec = NULL;
6318 struct rbd_client *rbdc;
6319 bool read_only;
6320 int rc;
6321
6322 if (!try_module_get(THIS_MODULE))
6323 return -ENODEV;
6324
6325
6326 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6327 if (rc < 0)
6328 goto out;
6329
6330 rbdc = rbd_get_client(ceph_opts);
6331 if (IS_ERR(rbdc)) {
6332 rc = PTR_ERR(rbdc);
6333 goto err_out_args;
6334 }
6335
6336
6337 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6338 if (rc < 0) {
6339 if (rc == -ENOENT)
6340 pr_info("pool %s does not exist\n", spec->pool_name);
6341 goto err_out_client;
6342 }
6343 spec->pool_id = (u64)rc;
6344
6345
6346
6347 if (spec->pool_id > (u64)U32_MAX) {
6348 rbd_warn(NULL, "pool id too large (%llu > %u)",
6349 (unsigned long long)spec->pool_id, U32_MAX);
6350 rc = -EIO;
6351 goto err_out_client;
6352 }
6353
6354 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6355 if (!rbd_dev) {
6356 rc = -ENOMEM;
6357 goto err_out_client;
6358 }
6359 rbdc = NULL;
6360 spec = NULL;
6361 rbd_opts = NULL;
6362
6363 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6364 if (!rbd_dev->config_info) {
6365 rc = -ENOMEM;
6366 goto err_out_rbd_dev;
6367 }
6368
6369 down_write(&rbd_dev->header_rwsem);
6370 rc = rbd_dev_image_probe(rbd_dev, 0);
6371 if (rc < 0) {
6372 up_write(&rbd_dev->header_rwsem);
6373 goto err_out_rbd_dev;
6374 }
6375
6376
6377
6378 read_only = rbd_dev->opts->read_only;
6379 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6380 read_only = true;
6381 rbd_dev->mapping.read_only = read_only;
6382
6383 rc = rbd_dev_device_setup(rbd_dev);
6384 if (rc) {
6385
6386
6387
6388
6389
6390 rbd_unregister_watch(rbd_dev);
6391 rbd_dev_image_release(rbd_dev);
6392 goto out;
6393 }
6394
6395 rc = count;
6396out:
6397 module_put(THIS_MODULE);
6398 return rc;
6399
6400err_out_rbd_dev:
6401 rbd_dev_destroy(rbd_dev);
6402err_out_client:
6403 rbd_put_client(rbdc);
6404err_out_args:
6405 rbd_spec_put(spec);
6406 kfree(rbd_opts);
6407 goto out;
6408}
6409
6410static ssize_t rbd_add(struct bus_type *bus,
6411 const char *buf,
6412 size_t count)
6413{
6414 if (single_major)
6415 return -EINVAL;
6416
6417 return do_rbd_add(bus, buf, count);
6418}
6419
6420static ssize_t rbd_add_single_major(struct bus_type *bus,
6421 const char *buf,
6422 size_t count)
6423{
6424 return do_rbd_add(bus, buf, count);
6425}
6426
6427static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6428{
6429 rbd_free_disk(rbd_dev);
6430
6431 spin_lock(&rbd_dev_list_lock);
6432 list_del_init(&rbd_dev->node);
6433 spin_unlock(&rbd_dev_list_lock);
6434
6435 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6436 device_del(&rbd_dev->dev);
6437 rbd_dev_mapping_clear(rbd_dev);
6438 if (!single_major)
6439 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6440}
6441
6442static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6443{
6444 while (rbd_dev->parent) {
6445 struct rbd_device *first = rbd_dev;
6446 struct rbd_device *second = first->parent;
6447 struct rbd_device *third;
6448
6449
6450
6451
6452
6453 while (second && (third = second->parent)) {
6454 first = second;
6455 second = third;
6456 }
6457 rbd_assert(second);
6458 rbd_dev_image_release(second);
6459 first->parent = NULL;
6460 first->parent_overlap = 0;
6461
6462 rbd_assert(first->parent_spec);
6463 rbd_spec_put(first->parent_spec);
6464 first->parent_spec = NULL;
6465 }
6466}
6467
6468static ssize_t do_rbd_remove(struct bus_type *bus,
6469 const char *buf,
6470 size_t count)
6471{
6472 struct rbd_device *rbd_dev = NULL;
6473 struct list_head *tmp;
6474 int dev_id;
6475 char opt_buf[6];
6476 bool already = false;
6477 bool force = false;
6478 int ret;
6479
6480 dev_id = -1;
6481 opt_buf[0] = '\0';
6482 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6483 if (dev_id < 0) {
6484 pr_err("dev_id out of range\n");
6485 return -EINVAL;
6486 }
6487 if (opt_buf[0] != '\0') {
6488 if (!strcmp(opt_buf, "force")) {
6489 force = true;
6490 } else {
6491 pr_err("bad remove option at '%s'\n", opt_buf);
6492 return -EINVAL;
6493 }
6494 }
6495
6496 ret = -ENOENT;
6497 spin_lock(&rbd_dev_list_lock);
6498 list_for_each(tmp, &rbd_dev_list) {
6499 rbd_dev = list_entry(tmp, struct rbd_device, node);
6500 if (rbd_dev->dev_id == dev_id) {
6501 ret = 0;
6502 break;
6503 }
6504 }
6505 if (!ret) {
6506 spin_lock_irq(&rbd_dev->lock);
6507 if (rbd_dev->open_count && !force)
6508 ret = -EBUSY;
6509 else
6510 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6511 &rbd_dev->flags);
6512 spin_unlock_irq(&rbd_dev->lock);
6513 }
6514 spin_unlock(&rbd_dev_list_lock);
6515 if (ret < 0 || already)
6516 return ret;
6517
6518 if (force) {
6519
6520
6521
6522
6523 blk_mq_freeze_queue(rbd_dev->disk->queue);
6524 blk_set_queue_dying(rbd_dev->disk->queue);
6525 }
6526
6527 down_write(&rbd_dev->lock_rwsem);
6528 if (__rbd_is_lock_owner(rbd_dev))
6529 rbd_unlock(rbd_dev);
6530 up_write(&rbd_dev->lock_rwsem);
6531 rbd_unregister_watch(rbd_dev);
6532
6533
6534
6535
6536
6537
6538
6539 rbd_dev_device_release(rbd_dev);
6540 rbd_dev_image_release(rbd_dev);
6541
6542 return count;
6543}
6544
6545static ssize_t rbd_remove(struct bus_type *bus,
6546 const char *buf,
6547 size_t count)
6548{
6549 if (single_major)
6550 return -EINVAL;
6551
6552 return do_rbd_remove(bus, buf, count);
6553}
6554
6555static ssize_t rbd_remove_single_major(struct bus_type *bus,
6556 const char *buf,
6557 size_t count)
6558{
6559 return do_rbd_remove(bus, buf, count);
6560}
6561
6562
6563
6564
6565
6566static int rbd_sysfs_init(void)
6567{
6568 int ret;
6569
6570 ret = device_register(&rbd_root_dev);
6571 if (ret < 0)
6572 return ret;
6573
6574 ret = bus_register(&rbd_bus_type);
6575 if (ret < 0)
6576 device_unregister(&rbd_root_dev);
6577
6578 return ret;
6579}
6580
6581static void rbd_sysfs_cleanup(void)
6582{
6583 bus_unregister(&rbd_bus_type);
6584 device_unregister(&rbd_root_dev);
6585}
6586
6587static int rbd_slab_init(void)
6588{
6589 rbd_assert(!rbd_img_request_cache);
6590 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6591 if (!rbd_img_request_cache)
6592 return -ENOMEM;
6593
6594 rbd_assert(!rbd_obj_request_cache);
6595 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6596 if (!rbd_obj_request_cache)
6597 goto out_err;
6598
6599 rbd_assert(!rbd_segment_name_cache);
6600 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
6601 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
6602 if (rbd_segment_name_cache)
6603 return 0;
6604out_err:
6605 kmem_cache_destroy(rbd_obj_request_cache);
6606 rbd_obj_request_cache = NULL;
6607
6608 kmem_cache_destroy(rbd_img_request_cache);
6609 rbd_img_request_cache = NULL;
6610
6611 return -ENOMEM;
6612}
6613
6614static void rbd_slab_exit(void)
6615{
6616 rbd_assert(rbd_segment_name_cache);
6617 kmem_cache_destroy(rbd_segment_name_cache);
6618 rbd_segment_name_cache = NULL;
6619
6620 rbd_assert(rbd_obj_request_cache);
6621 kmem_cache_destroy(rbd_obj_request_cache);
6622 rbd_obj_request_cache = NULL;
6623
6624 rbd_assert(rbd_img_request_cache);
6625 kmem_cache_destroy(rbd_img_request_cache);
6626 rbd_img_request_cache = NULL;
6627}
6628
6629static int __init rbd_init(void)
6630{
6631 int rc;
6632
6633 if (!libceph_compatible(NULL)) {
6634 rbd_warn(NULL, "libceph incompatibility (quitting)");
6635 return -EINVAL;
6636 }
6637
6638 rc = rbd_slab_init();
6639 if (rc)
6640 return rc;
6641
6642
6643
6644
6645
6646 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6647 if (!rbd_wq) {
6648 rc = -ENOMEM;
6649 goto err_out_slab;
6650 }
6651
6652 rbd_bus_type.bus_attrs = rbd_bus_attrs;
6653 if (single_major) {
6654 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6655 if (rbd_major < 0) {
6656 rc = rbd_major;
6657 goto err_out_wq;
6658 }
6659 rbd_bus_type.bus_attrs = rbd_bus_attrs_single_major;
6660 }
6661
6662 rc = rbd_sysfs_init();
6663 if (rc)
6664 goto err_out_blkdev;
6665
6666 if (single_major)
6667 pr_info("loaded (major %d)\n", rbd_major);
6668 else
6669 pr_info("loaded\n");
6670
6671 return 0;
6672
6673err_out_blkdev:
6674 if (single_major)
6675 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6676err_out_wq:
6677 destroy_workqueue(rbd_wq);
6678err_out_slab:
6679 rbd_slab_exit();
6680 return rc;
6681}
6682
6683static void __exit rbd_exit(void)
6684{
6685 ida_destroy(&rbd_dev_id_ida);
6686 rbd_sysfs_cleanup();
6687 if (single_major)
6688 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6689 destroy_workqueue(rbd_wq);
6690 rbd_slab_exit();
6691}
6692
6693module_init(rbd_init);
6694module_exit(rbd_exit);
6695
6696MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6697MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6698MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6699
6700MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6701
6702MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6703MODULE_LICENSE("GPL");
6704