1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/striper.h>
36#include <linux/ceph/decode.h>
37#include <linux/parser.h>
38#include <linux/bsearch.h>
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
43#include <linux/blk-mq.h>
44#include <linux/fs.h>
45#include <linux/blkdev.h>
46#include <linux/slab.h>
47#include <linux/idr.h>
48#include <linux/workqueue.h>
49
50#include "rbd_types.h"
51
52#define RBD_DEBUG
53
54
55
56
57
58
59
60static int atomic_inc_return_safe(atomic_t *v)
61{
62 unsigned int counter;
63
64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
66 return (int)counter;
67
68 atomic_dec(v);
69
70 return -EINVAL;
71}
72
73
74static int atomic_dec_return_safe(atomic_t *v)
75{
76 int counter;
77
78 counter = atomic_dec_return(v);
79 if (counter >= 0)
80 return counter;
81
82 atomic_inc(v);
83
84 return -EINVAL;
85}
86
87#define RBD_DRV_NAME "rbd"
88
89#define RBD_MINORS_PER_MAJOR 256
90#define RBD_SINGLE_MAJOR_PART_SHIFT 4
91
92#define RBD_MAX_PARENT_CHAIN_LEN 16
93
94#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95#define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98#define RBD_MAX_SNAP_COUNT 510
99
100#define RBD_SNAP_HEAD_NAME "-"
101
102#define BAD_SNAP_INDEX U32_MAX
103
104
105#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
106#define RBD_IMAGE_ID_LEN_MAX 64
107
108#define RBD_OBJ_PREFIX_LEN_MAX 64
109
110#define RBD_NOTIFY_TIMEOUT 5
111#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112
113
114
115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
121#define RBD_FEATURE_DATA_POOL (1ULL<<7)
122#define RBD_FEATURE_OPERATIONS (1ULL<<8)
123
124#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
125 RBD_FEATURE_STRIPINGV2 | \
126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127 RBD_FEATURE_OBJECT_MAP | \
128 RBD_FEATURE_FAST_DIFF | \
129 RBD_FEATURE_DEEP_FLATTEN | \
130 RBD_FEATURE_DATA_POOL | \
131 RBD_FEATURE_OPERATIONS)
132
133
134
135#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
136
137
138
139
140
141#define DEV_NAME_LEN 32
142
143
144
145
146struct rbd_image_header {
147
148 char *object_prefix;
149 __u8 obj_order;
150 u64 stripe_unit;
151 u64 stripe_count;
152 s64 data_pool_id;
153 u64 features;
154
155
156 u64 image_size;
157 struct ceph_snap_context *snapc;
158 char *snap_names;
159 u64 *snap_sizes;
160};
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187struct rbd_spec {
188 u64 pool_id;
189 const char *pool_name;
190 const char *pool_ns;
191
192 const char *image_id;
193 const char *image_name;
194
195 u64 snap_id;
196 const char *snap_name;
197
198 struct kref kref;
199};
200
201
202
203
204struct rbd_client {
205 struct ceph_client *client;
206 struct kref kref;
207 struct list_head node;
208};
209
210struct pending_result {
211 int result;
212 int num_pending;
213};
214
215struct rbd_img_request;
216
217enum obj_request_type {
218 OBJ_REQUEST_NODATA = 1,
219 OBJ_REQUEST_BIO,
220 OBJ_REQUEST_BVECS,
221 OBJ_REQUEST_OWN_BVECS,
222};
223
224enum obj_operation_type {
225 OBJ_OP_READ = 1,
226 OBJ_OP_WRITE,
227 OBJ_OP_DISCARD,
228 OBJ_OP_ZEROOUT,
229};
230
231#define RBD_OBJ_FLAG_DELETION (1U << 0)
232#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
236
237enum rbd_obj_read_state {
238 RBD_OBJ_READ_START = 1,
239 RBD_OBJ_READ_OBJECT,
240 RBD_OBJ_READ_PARENT,
241};
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268enum rbd_obj_write_state {
269 RBD_OBJ_WRITE_START = 1,
270 RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271 RBD_OBJ_WRITE_OBJECT,
272 __RBD_OBJ_WRITE_COPYUP,
273 RBD_OBJ_WRITE_COPYUP,
274 RBD_OBJ_WRITE_POST_OBJECT_MAP,
275};
276
277enum rbd_obj_copyup_state {
278 RBD_OBJ_COPYUP_START = 1,
279 RBD_OBJ_COPYUP_READ_PARENT,
280 __RBD_OBJ_COPYUP_OBJECT_MAPS,
281 RBD_OBJ_COPYUP_OBJECT_MAPS,
282 __RBD_OBJ_COPYUP_WRITE_OBJECT,
283 RBD_OBJ_COPYUP_WRITE_OBJECT,
284};
285
286struct rbd_obj_request {
287 struct ceph_object_extent ex;
288 unsigned int flags;
289 union {
290 enum rbd_obj_read_state read_state;
291 enum rbd_obj_write_state write_state;
292 };
293
294 struct rbd_img_request *img_request;
295 struct ceph_file_extent *img_extents;
296 u32 num_img_extents;
297
298 union {
299 struct ceph_bio_iter bio_pos;
300 struct {
301 struct ceph_bvec_iter bvec_pos;
302 u32 bvec_count;
303 u32 bvec_idx;
304 };
305 };
306
307 enum rbd_obj_copyup_state copyup_state;
308 struct bio_vec *copyup_bvecs;
309 u32 copyup_bvec_count;
310
311 struct list_head osd_reqs;
312
313 struct mutex state_mutex;
314 struct pending_result pending;
315 struct kref kref;
316};
317
318enum img_req_flags {
319 IMG_REQ_CHILD,
320 IMG_REQ_LAYERED,
321};
322
323enum rbd_img_state {
324 RBD_IMG_START = 1,
325 RBD_IMG_EXCLUSIVE_LOCK,
326 __RBD_IMG_OBJECT_REQUESTS,
327 RBD_IMG_OBJECT_REQUESTS,
328};
329
330struct rbd_img_request {
331 struct rbd_device *rbd_dev;
332 enum obj_operation_type op_type;
333 enum obj_request_type data_type;
334 unsigned long flags;
335 enum rbd_img_state state;
336 union {
337 u64 snap_id;
338 struct ceph_snap_context *snapc;
339 };
340 struct rbd_obj_request *obj_request;
341
342 struct list_head lock_item;
343 struct list_head object_extents;
344
345 struct mutex state_mutex;
346 struct pending_result pending;
347 struct work_struct work;
348 int work_result;
349};
350
351#define for_each_obj_request(ireq, oreq) \
352 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
353#define for_each_obj_request_safe(ireq, oreq, n) \
354 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
355
356enum rbd_watch_state {
357 RBD_WATCH_STATE_UNREGISTERED,
358 RBD_WATCH_STATE_REGISTERED,
359 RBD_WATCH_STATE_ERROR,
360};
361
362enum rbd_lock_state {
363 RBD_LOCK_STATE_UNLOCKED,
364 RBD_LOCK_STATE_LOCKED,
365 RBD_LOCK_STATE_RELEASING,
366};
367
368
369struct rbd_client_id {
370 u64 gid;
371 u64 handle;
372};
373
374struct rbd_mapping {
375 u64 size;
376};
377
378
379
380
381struct rbd_device {
382 int dev_id;
383
384 int major;
385 int minor;
386 struct gendisk *disk;
387
388 u32 image_format;
389 struct rbd_client *rbd_client;
390
391 char name[DEV_NAME_LEN];
392
393 spinlock_t lock;
394
395 struct rbd_image_header header;
396 unsigned long flags;
397 struct rbd_spec *spec;
398 struct rbd_options *opts;
399 char *config_info;
400
401 struct ceph_object_id header_oid;
402 struct ceph_object_locator header_oloc;
403
404 struct ceph_file_layout layout;
405
406 struct mutex watch_mutex;
407 enum rbd_watch_state watch_state;
408 struct ceph_osd_linger_request *watch_handle;
409 u64 watch_cookie;
410 struct delayed_work watch_dwork;
411
412 struct rw_semaphore lock_rwsem;
413 enum rbd_lock_state lock_state;
414 char lock_cookie[32];
415 struct rbd_client_id owner_cid;
416 struct work_struct acquired_lock_work;
417 struct work_struct released_lock_work;
418 struct delayed_work lock_dwork;
419 struct work_struct unlock_work;
420 spinlock_t lock_lists_lock;
421 struct list_head acquiring_list;
422 struct list_head running_list;
423 struct completion acquire_wait;
424 int acquire_err;
425 struct completion releasing_wait;
426
427 spinlock_t object_map_lock;
428 u8 *object_map;
429 u64 object_map_size;
430 u64 object_map_flags;
431
432 struct workqueue_struct *task_wq;
433
434 struct rbd_spec *parent_spec;
435 u64 parent_overlap;
436 atomic_t parent_ref;
437 struct rbd_device *parent;
438
439
440 struct blk_mq_tag_set tag_set;
441
442
443 struct rw_semaphore header_rwsem;
444
445 struct rbd_mapping mapping;
446
447 struct list_head node;
448
449
450 struct device dev;
451 unsigned long open_count;
452};
453
454
455
456
457
458
459enum rbd_dev_flags {
460 RBD_DEV_FLAG_EXISTS,
461 RBD_DEV_FLAG_REMOVING,
462 RBD_DEV_FLAG_READONLY,
463};
464
465static DEFINE_MUTEX(client_mutex);
466
467static LIST_HEAD(rbd_dev_list);
468static DEFINE_SPINLOCK(rbd_dev_list_lock);
469
470static LIST_HEAD(rbd_client_list);
471static DEFINE_SPINLOCK(rbd_client_list_lock);
472
473
474
475static struct kmem_cache *rbd_img_request_cache;
476static struct kmem_cache *rbd_obj_request_cache;
477
478static int rbd_major;
479static DEFINE_IDA(rbd_dev_id_ida);
480
481static struct workqueue_struct *rbd_wq;
482
483static struct ceph_snap_context rbd_empty_snapc = {
484 .nref = REFCOUNT_INIT(1),
485};
486
487
488
489
490static bool single_major = true;
491module_param(single_major, bool, 0444);
492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
493
494static ssize_t rbd_add(struct bus_type *bus, const char *buf,
495 size_t count);
496static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
497 size_t count);
498static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
499 size_t count);
500static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
501 size_t count);
502static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
503
504static int rbd_dev_id_to_minor(int dev_id)
505{
506 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
507}
508
509static int minor_to_rbd_dev_id(int minor)
510{
511 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
512}
513
514static bool rbd_is_ro(struct rbd_device *rbd_dev)
515{
516 return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
517}
518
519static bool rbd_is_snap(struct rbd_device *rbd_dev)
520{
521 return rbd_dev->spec->snap_id != CEPH_NOSNAP;
522}
523
524static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
525{
526 lockdep_assert_held(&rbd_dev->lock_rwsem);
527
528 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
529 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
530}
531
532static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
533{
534 bool is_lock_owner;
535
536 down_read(&rbd_dev->lock_rwsem);
537 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
538 up_read(&rbd_dev->lock_rwsem);
539 return is_lock_owner;
540}
541
542static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
543{
544 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
545}
546
547static BUS_ATTR(add, 0200, NULL, rbd_add);
548static BUS_ATTR(remove, 0200, NULL, rbd_remove);
549static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
550static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
551static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
552
553static struct attribute *rbd_bus_attrs[] = {
554 &bus_attr_add.attr,
555 &bus_attr_remove.attr,
556 &bus_attr_add_single_major.attr,
557 &bus_attr_remove_single_major.attr,
558 &bus_attr_supported_features.attr,
559 NULL,
560};
561
562static umode_t rbd_bus_is_visible(struct kobject *kobj,
563 struct attribute *attr, int index)
564{
565 if (!single_major &&
566 (attr == &bus_attr_add_single_major.attr ||
567 attr == &bus_attr_remove_single_major.attr))
568 return 0;
569
570 return attr->mode;
571}
572
573static const struct attribute_group rbd_bus_group = {
574 .attrs = rbd_bus_attrs,
575 .is_visible = rbd_bus_is_visible,
576};
577__ATTRIBUTE_GROUPS(rbd_bus);
578
579static struct bus_type rbd_bus_type = {
580 .name = "rbd",
581 .bus_groups = rbd_bus_groups,
582};
583
584static void rbd_root_dev_release(struct device *dev)
585{
586}
587
588static struct device rbd_root_dev = {
589 .init_name = "rbd",
590 .release = rbd_root_dev_release,
591};
592
593static __printf(2, 3)
594void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
595{
596 struct va_format vaf;
597 va_list args;
598
599 va_start(args, fmt);
600 vaf.fmt = fmt;
601 vaf.va = &args;
602
603 if (!rbd_dev)
604 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
605 else if (rbd_dev->disk)
606 printk(KERN_WARNING "%s: %s: %pV\n",
607 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
608 else if (rbd_dev->spec && rbd_dev->spec->image_name)
609 printk(KERN_WARNING "%s: image %s: %pV\n",
610 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
611 else if (rbd_dev->spec && rbd_dev->spec->image_id)
612 printk(KERN_WARNING "%s: id %s: %pV\n",
613 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
614 else
615 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
616 RBD_DRV_NAME, rbd_dev, &vaf);
617 va_end(args);
618}
619
620#ifdef RBD_DEBUG
621#define rbd_assert(expr) \
622 if (unlikely(!(expr))) { \
623 printk(KERN_ERR "\nAssertion failure in %s() " \
624 "at line %d:\n\n" \
625 "\trbd_assert(%s);\n\n", \
626 __func__, __LINE__, #expr); \
627 BUG(); \
628 }
629#else
630# define rbd_assert(expr) ((void) 0)
631#endif
632
633static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
634
635static int rbd_dev_refresh(struct rbd_device *rbd_dev);
636static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
637static int rbd_dev_header_info(struct rbd_device *rbd_dev);
638static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
639static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
640 u64 snap_id);
641static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
642 u8 *order, u64 *snap_size);
643static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
644
645static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
646static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
647
648
649
650
651static bool pending_result_dec(struct pending_result *pending, int *result)
652{
653 rbd_assert(pending->num_pending > 0);
654
655 if (*result && !pending->result)
656 pending->result = *result;
657 if (--pending->num_pending)
658 return false;
659
660 *result = pending->result;
661 return true;
662}
663
664static int rbd_open(struct block_device *bdev, fmode_t mode)
665{
666 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
667 bool removing = false;
668
669 spin_lock_irq(&rbd_dev->lock);
670 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
671 removing = true;
672 else
673 rbd_dev->open_count++;
674 spin_unlock_irq(&rbd_dev->lock);
675 if (removing)
676 return -ENOENT;
677
678 (void) get_device(&rbd_dev->dev);
679
680 return 0;
681}
682
683static void rbd_release(struct gendisk *disk, fmode_t mode)
684{
685 struct rbd_device *rbd_dev = disk->private_data;
686 unsigned long open_count_before;
687
688 spin_lock_irq(&rbd_dev->lock);
689 open_count_before = rbd_dev->open_count--;
690 spin_unlock_irq(&rbd_dev->lock);
691 rbd_assert(open_count_before > 0);
692
693 put_device(&rbd_dev->dev);
694}
695
696static int rbd_set_read_only(struct block_device *bdev, bool ro)
697{
698 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
699
700
701
702
703
704 if (!ro) {
705 if (rbd_is_ro(rbd_dev))
706 return -EROFS;
707
708 rbd_assert(!rbd_is_snap(rbd_dev));
709 }
710
711 return 0;
712}
713
714static const struct block_device_operations rbd_bd_ops = {
715 .owner = THIS_MODULE,
716 .open = rbd_open,
717 .release = rbd_release,
718 .set_read_only = rbd_set_read_only,
719};
720
721
722
723
724
725static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
726{
727 struct rbd_client *rbdc;
728 int ret = -ENOMEM;
729
730 dout("%s:\n", __func__);
731 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
732 if (!rbdc)
733 goto out_opt;
734
735 kref_init(&rbdc->kref);
736 INIT_LIST_HEAD(&rbdc->node);
737
738 rbdc->client = ceph_create_client(ceph_opts, rbdc);
739 if (IS_ERR(rbdc->client))
740 goto out_rbdc;
741 ceph_opts = NULL;
742
743 ret = ceph_open_session(rbdc->client);
744 if (ret < 0)
745 goto out_client;
746
747 spin_lock(&rbd_client_list_lock);
748 list_add_tail(&rbdc->node, &rbd_client_list);
749 spin_unlock(&rbd_client_list_lock);
750
751 dout("%s: rbdc %p\n", __func__, rbdc);
752
753 return rbdc;
754out_client:
755 ceph_destroy_client(rbdc->client);
756out_rbdc:
757 kfree(rbdc);
758out_opt:
759 if (ceph_opts)
760 ceph_destroy_options(ceph_opts);
761 dout("%s: error %d\n", __func__, ret);
762
763 return ERR_PTR(ret);
764}
765
766static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
767{
768 kref_get(&rbdc->kref);
769
770 return rbdc;
771}
772
773
774
775
776
777static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
778{
779 struct rbd_client *client_node;
780 bool found = false;
781
782 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
783 return NULL;
784
785 spin_lock(&rbd_client_list_lock);
786 list_for_each_entry(client_node, &rbd_client_list, node) {
787 if (!ceph_compare_options(ceph_opts, client_node->client)) {
788 __rbd_get_client(client_node);
789
790 found = true;
791 break;
792 }
793 }
794 spin_unlock(&rbd_client_list_lock);
795
796 return found ? client_node : NULL;
797}
798
799
800
801
802enum {
803 Opt_queue_depth,
804 Opt_alloc_size,
805 Opt_lock_timeout,
806 Opt_last_int,
807
808 Opt_pool_ns,
809 Opt_compression_hint,
810 Opt_last_string,
811
812 Opt_read_only,
813 Opt_read_write,
814 Opt_lock_on_read,
815 Opt_exclusive,
816 Opt_notrim,
817 Opt_err
818};
819
820static match_table_t rbd_opts_tokens = {
821 {Opt_queue_depth, "queue_depth=%d"},
822 {Opt_alloc_size, "alloc_size=%d"},
823 {Opt_lock_timeout, "lock_timeout=%d"},
824
825 {Opt_pool_ns, "_pool_ns=%s"},
826 {Opt_compression_hint, "compression_hint=%s"},
827
828 {Opt_read_only, "read_only"},
829 {Opt_read_only, "ro"},
830 {Opt_read_write, "read_write"},
831 {Opt_read_write, "rw"},
832 {Opt_lock_on_read, "lock_on_read"},
833 {Opt_exclusive, "exclusive"},
834 {Opt_notrim, "notrim"},
835 {Opt_err, NULL}
836};
837
838struct rbd_options {
839 int queue_depth;
840 int alloc_size;
841 unsigned long lock_timeout;
842 bool read_only;
843 bool lock_on_read;
844 bool exclusive;
845 bool trim;
846
847 u32 alloc_hint_flags;
848};
849
850#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
851#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
852#define RBD_LOCK_TIMEOUT_DEFAULT 0
853#define RBD_READ_ONLY_DEFAULT false
854#define RBD_LOCK_ON_READ_DEFAULT false
855#define RBD_EXCLUSIVE_DEFAULT false
856#define RBD_TRIM_DEFAULT true
857
858struct parse_rbd_opts_ctx {
859 struct rbd_spec *spec;
860 struct rbd_options *opts;
861};
862
863static int parse_rbd_opts_token(char *c, void *private)
864{
865 struct parse_rbd_opts_ctx *pctx = private;
866 substring_t argstr[MAX_OPT_ARGS];
867 int token, intval, ret;
868
869 token = match_token(c, rbd_opts_tokens, argstr);
870 if (token < Opt_last_int) {
871 ret = match_int(&argstr[0], &intval);
872 if (ret < 0) {
873 pr_err("bad option arg (not int) at '%s'\n", c);
874 return ret;
875 }
876 dout("got int token %d val %d\n", token, intval);
877 } else if (token > Opt_last_int && token < Opt_last_string) {
878 dout("got string token %d val %s\n", token, argstr[0].from);
879 } else {
880 dout("got token %d\n", token);
881 }
882
883 switch (token) {
884 case Opt_queue_depth:
885 if (intval < 1) {
886 pr_err("queue_depth out of range\n");
887 return -EINVAL;
888 }
889 pctx->opts->queue_depth = intval;
890 break;
891 case Opt_alloc_size:
892 if (intval < SECTOR_SIZE) {
893 pr_err("alloc_size out of range\n");
894 return -EINVAL;
895 }
896 if (!is_power_of_2(intval)) {
897 pr_err("alloc_size must be a power of 2\n");
898 return -EINVAL;
899 }
900 pctx->opts->alloc_size = intval;
901 break;
902 case Opt_lock_timeout:
903
904 if (intval < 0 || intval > INT_MAX / 1000) {
905 pr_err("lock_timeout out of range\n");
906 return -EINVAL;
907 }
908 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
909 break;
910 case Opt_pool_ns:
911 kfree(pctx->spec->pool_ns);
912 pctx->spec->pool_ns = match_strdup(argstr);
913 if (!pctx->spec->pool_ns)
914 return -ENOMEM;
915 break;
916 case Opt_compression_hint:
917 if (!strcmp(argstr[0].from, "none")) {
918 pctx->opts->alloc_hint_flags &=
919 ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
920 CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
921 } else if (!strcmp(argstr[0].from, "compressible")) {
922 pctx->opts->alloc_hint_flags |=
923 CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
924 pctx->opts->alloc_hint_flags &=
925 ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
926 } else if (!strcmp(argstr[0].from, "incompressible")) {
927 pctx->opts->alloc_hint_flags |=
928 CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
929 pctx->opts->alloc_hint_flags &=
930 ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
931 } else {
932 return -EINVAL;
933 }
934 break;
935 case Opt_read_only:
936 pctx->opts->read_only = true;
937 break;
938 case Opt_read_write:
939 pctx->opts->read_only = false;
940 break;
941 case Opt_lock_on_read:
942 pctx->opts->lock_on_read = true;
943 break;
944 case Opt_exclusive:
945 pctx->opts->exclusive = true;
946 break;
947 case Opt_notrim:
948 pctx->opts->trim = false;
949 break;
950 default:
951
952 return -EINVAL;
953 }
954
955 return 0;
956}
957
958static char* obj_op_name(enum obj_operation_type op_type)
959{
960 switch (op_type) {
961 case OBJ_OP_READ:
962 return "read";
963 case OBJ_OP_WRITE:
964 return "write";
965 case OBJ_OP_DISCARD:
966 return "discard";
967 case OBJ_OP_ZEROOUT:
968 return "zeroout";
969 default:
970 return "???";
971 }
972}
973
974
975
976
977
978
979static void rbd_client_release(struct kref *kref)
980{
981 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
982
983 dout("%s: rbdc %p\n", __func__, rbdc);
984 spin_lock(&rbd_client_list_lock);
985 list_del(&rbdc->node);
986 spin_unlock(&rbd_client_list_lock);
987
988 ceph_destroy_client(rbdc->client);
989 kfree(rbdc);
990}
991
992
993
994
995
996static void rbd_put_client(struct rbd_client *rbdc)
997{
998 if (rbdc)
999 kref_put(&rbdc->kref, rbd_client_release);
1000}
1001
1002
1003
1004
1005
1006
1007static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1008{
1009 struct rbd_client *rbdc;
1010 int ret;
1011
1012 mutex_lock(&client_mutex);
1013 rbdc = rbd_client_find(ceph_opts);
1014 if (rbdc) {
1015 ceph_destroy_options(ceph_opts);
1016
1017
1018
1019
1020
1021 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1022 rbdc->client->options->mount_timeout);
1023 if (ret) {
1024 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1025 rbd_put_client(rbdc);
1026 rbdc = ERR_PTR(ret);
1027 }
1028 } else {
1029 rbdc = rbd_client_create(ceph_opts);
1030 }
1031 mutex_unlock(&client_mutex);
1032
1033 return rbdc;
1034}
1035
1036static bool rbd_image_format_valid(u32 image_format)
1037{
1038 return image_format == 1 || image_format == 2;
1039}
1040
1041static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1042{
1043 size_t size;
1044 u32 snap_count;
1045
1046
1047 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1048 return false;
1049
1050
1051
1052 if (ondisk->options.order < SECTOR_SHIFT)
1053 return false;
1054
1055
1056
1057 if (ondisk->options.order > 8 * sizeof (int) - 1)
1058 return false;
1059
1060
1061
1062
1063
1064 snap_count = le32_to_cpu(ondisk->snap_count);
1065 size = SIZE_MAX - sizeof (struct ceph_snap_context);
1066 if (snap_count > size / sizeof (__le64))
1067 return false;
1068
1069
1070
1071
1072
1073 size -= snap_count * sizeof (__le64);
1074 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1075 return false;
1076
1077 return true;
1078}
1079
1080
1081
1082
1083static u32 rbd_obj_bytes(struct rbd_image_header *header)
1084{
1085 return 1U << header->obj_order;
1086}
1087
1088static void rbd_init_layout(struct rbd_device *rbd_dev)
1089{
1090 if (rbd_dev->header.stripe_unit == 0 ||
1091 rbd_dev->header.stripe_count == 0) {
1092 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1093 rbd_dev->header.stripe_count = 1;
1094 }
1095
1096 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1097 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1098 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1099 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1100 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1101 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1102}
1103
1104
1105
1106
1107
1108static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1109 struct rbd_image_header_ondisk *ondisk)
1110{
1111 struct rbd_image_header *header = &rbd_dev->header;
1112 bool first_time = header->object_prefix == NULL;
1113 struct ceph_snap_context *snapc;
1114 char *object_prefix = NULL;
1115 char *snap_names = NULL;
1116 u64 *snap_sizes = NULL;
1117 u32 snap_count;
1118 int ret = -ENOMEM;
1119 u32 i;
1120
1121
1122
1123 if (first_time) {
1124 object_prefix = kstrndup(ondisk->object_prefix,
1125 sizeof(ondisk->object_prefix),
1126 GFP_KERNEL);
1127 if (!object_prefix)
1128 return -ENOMEM;
1129 }
1130
1131
1132
1133 snap_count = le32_to_cpu(ondisk->snap_count);
1134 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1135 if (!snapc)
1136 goto out_err;
1137 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1138 if (snap_count) {
1139 struct rbd_image_snap_ondisk *snaps;
1140 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1141
1142
1143
1144 if (snap_names_len > (u64)SIZE_MAX)
1145 goto out_2big;
1146 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1147 if (!snap_names)
1148 goto out_err;
1149
1150
1151 snap_sizes = kmalloc_array(snap_count,
1152 sizeof(*header->snap_sizes),
1153 GFP_KERNEL);
1154 if (!snap_sizes)
1155 goto out_err;
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1167 snaps = ondisk->snaps;
1168 for (i = 0; i < snap_count; i++) {
1169 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1170 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1171 }
1172 }
1173
1174
1175
1176 if (first_time) {
1177 header->object_prefix = object_prefix;
1178 header->obj_order = ondisk->options.order;
1179 rbd_init_layout(rbd_dev);
1180 } else {
1181 ceph_put_snap_context(header->snapc);
1182 kfree(header->snap_names);
1183 kfree(header->snap_sizes);
1184 }
1185
1186
1187
1188 header->image_size = le64_to_cpu(ondisk->image_size);
1189 header->snapc = snapc;
1190 header->snap_names = snap_names;
1191 header->snap_sizes = snap_sizes;
1192
1193 return 0;
1194out_2big:
1195 ret = -EIO;
1196out_err:
1197 kfree(snap_sizes);
1198 kfree(snap_names);
1199 ceph_put_snap_context(snapc);
1200 kfree(object_prefix);
1201
1202 return ret;
1203}
1204
1205static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1206{
1207 const char *snap_name;
1208
1209 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1210
1211
1212
1213 snap_name = rbd_dev->header.snap_names;
1214 while (which--)
1215 snap_name += strlen(snap_name) + 1;
1216
1217 return kstrdup(snap_name, GFP_KERNEL);
1218}
1219
1220
1221
1222
1223
1224static int snapid_compare_reverse(const void *s1, const void *s2)
1225{
1226 u64 snap_id1 = *(u64 *)s1;
1227 u64 snap_id2 = *(u64 *)s2;
1228
1229 if (snap_id1 < snap_id2)
1230 return 1;
1231 return snap_id1 == snap_id2 ? 0 : -1;
1232}
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1245{
1246 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1247 u64 *found;
1248
1249 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1250 sizeof (snap_id), snapid_compare_reverse);
1251
1252 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1253}
1254
1255static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1256 u64 snap_id)
1257{
1258 u32 which;
1259 const char *snap_name;
1260
1261 which = rbd_dev_snap_index(rbd_dev, snap_id);
1262 if (which == BAD_SNAP_INDEX)
1263 return ERR_PTR(-ENOENT);
1264
1265 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1266 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1267}
1268
1269static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1270{
1271 if (snap_id == CEPH_NOSNAP)
1272 return RBD_SNAP_HEAD_NAME;
1273
1274 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1275 if (rbd_dev->image_format == 1)
1276 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1277
1278 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1279}
1280
1281static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1282 u64 *snap_size)
1283{
1284 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1285 if (snap_id == CEPH_NOSNAP) {
1286 *snap_size = rbd_dev->header.image_size;
1287 } else if (rbd_dev->image_format == 1) {
1288 u32 which;
1289
1290 which = rbd_dev_snap_index(rbd_dev, snap_id);
1291 if (which == BAD_SNAP_INDEX)
1292 return -ENOENT;
1293
1294 *snap_size = rbd_dev->header.snap_sizes[which];
1295 } else {
1296 u64 size = 0;
1297 int ret;
1298
1299 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1300 if (ret)
1301 return ret;
1302
1303 *snap_size = size;
1304 }
1305 return 0;
1306}
1307
1308static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1309{
1310 u64 snap_id = rbd_dev->spec->snap_id;
1311 u64 size = 0;
1312 int ret;
1313
1314 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1315 if (ret)
1316 return ret;
1317
1318 rbd_dev->mapping.size = size;
1319 return 0;
1320}
1321
1322static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1323{
1324 rbd_dev->mapping.size = 0;
1325}
1326
1327static void zero_bvec(struct bio_vec *bv)
1328{
1329 void *buf;
1330 unsigned long flags;
1331
1332 buf = bvec_kmap_irq(bv, &flags);
1333 memset(buf, 0, bv->bv_len);
1334 flush_dcache_page(bv->bv_page);
1335 bvec_kunmap_irq(buf, &flags);
1336}
1337
1338static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1339{
1340 struct ceph_bio_iter it = *bio_pos;
1341
1342 ceph_bio_iter_advance(&it, off);
1343 ceph_bio_iter_advance_step(&it, bytes, ({
1344 zero_bvec(&bv);
1345 }));
1346}
1347
1348static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1349{
1350 struct ceph_bvec_iter it = *bvec_pos;
1351
1352 ceph_bvec_iter_advance(&it, off);
1353 ceph_bvec_iter_advance_step(&it, bytes, ({
1354 zero_bvec(&bv);
1355 }));
1356}
1357
1358
1359
1360
1361
1362
1363
1364static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1365 u32 bytes)
1366{
1367 dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1368
1369 switch (obj_req->img_request->data_type) {
1370 case OBJ_REQUEST_BIO:
1371 zero_bios(&obj_req->bio_pos, off, bytes);
1372 break;
1373 case OBJ_REQUEST_BVECS:
1374 case OBJ_REQUEST_OWN_BVECS:
1375 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1376 break;
1377 default:
1378 BUG();
1379 }
1380}
1381
1382static void rbd_obj_request_destroy(struct kref *kref);
1383static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1384{
1385 rbd_assert(obj_request != NULL);
1386 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1387 kref_read(&obj_request->kref));
1388 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1389}
1390
1391static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1392 struct rbd_obj_request *obj_request)
1393{
1394 rbd_assert(obj_request->img_request == NULL);
1395
1396
1397 obj_request->img_request = img_request;
1398 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1399}
1400
1401static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1402 struct rbd_obj_request *obj_request)
1403{
1404 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1405 list_del(&obj_request->ex.oe_item);
1406 rbd_assert(obj_request->img_request == img_request);
1407 rbd_obj_request_put(obj_request);
1408}
1409
1410static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1411{
1412 struct rbd_obj_request *obj_req = osd_req->r_priv;
1413
1414 dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1415 __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1416 obj_req->ex.oe_off, obj_req->ex.oe_len);
1417 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1418}
1419
1420
1421
1422
1423
1424
1425static void img_request_layered_set(struct rbd_img_request *img_request)
1426{
1427 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1428}
1429
1430static bool img_request_layered_test(struct rbd_img_request *img_request)
1431{
1432 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1433}
1434
1435static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1436{
1437 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1438
1439 return !obj_req->ex.oe_off &&
1440 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1441}
1442
1443static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1444{
1445 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1446
1447 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1448 rbd_dev->layout.object_size;
1449}
1450
1451
1452
1453
1454static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1455{
1456 if (!obj_req->num_img_extents ||
1457 (rbd_obj_is_entire(obj_req) &&
1458 !obj_req->img_request->snapc->num_snaps))
1459 return false;
1460
1461 return true;
1462}
1463
1464static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1465{
1466 return ceph_file_extents_bytes(obj_req->img_extents,
1467 obj_req->num_img_extents);
1468}
1469
1470static bool rbd_img_is_write(struct rbd_img_request *img_req)
1471{
1472 switch (img_req->op_type) {
1473 case OBJ_OP_READ:
1474 return false;
1475 case OBJ_OP_WRITE:
1476 case OBJ_OP_DISCARD:
1477 case OBJ_OP_ZEROOUT:
1478 return true;
1479 default:
1480 BUG();
1481 }
1482}
1483
1484static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1485{
1486 struct rbd_obj_request *obj_req = osd_req->r_priv;
1487 int result;
1488
1489 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1490 osd_req->r_result, obj_req);
1491
1492
1493
1494
1495
1496
1497 if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1498 result = 0;
1499 else
1500 result = osd_req->r_result;
1501
1502 rbd_obj_handle_request(obj_req, result);
1503}
1504
1505static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1506{
1507 struct rbd_obj_request *obj_request = osd_req->r_priv;
1508 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1509 struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1510
1511 osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1512 osd_req->r_snapid = obj_request->img_request->snap_id;
1513}
1514
1515static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1516{
1517 struct rbd_obj_request *obj_request = osd_req->r_priv;
1518
1519 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1520 ktime_get_real_ts64(&osd_req->r_mtime);
1521 osd_req->r_data_offset = obj_request->ex.oe_off;
1522}
1523
1524static struct ceph_osd_request *
1525__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1526 struct ceph_snap_context *snapc, int num_ops)
1527{
1528 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1529 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1530 struct ceph_osd_request *req;
1531 const char *name_format = rbd_dev->image_format == 1 ?
1532 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1533 int ret;
1534
1535 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1536 if (!req)
1537 return ERR_PTR(-ENOMEM);
1538
1539 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1540 req->r_callback = rbd_osd_req_callback;
1541 req->r_priv = obj_req;
1542
1543
1544
1545
1546
1547 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1548 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1549
1550 ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1551 rbd_dev->header.object_prefix,
1552 obj_req->ex.oe_objno);
1553 if (ret)
1554 return ERR_PTR(ret);
1555
1556 return req;
1557}
1558
1559static struct ceph_osd_request *
1560rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1561{
1562 return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1563 num_ops);
1564}
1565
1566static struct rbd_obj_request *rbd_obj_request_create(void)
1567{
1568 struct rbd_obj_request *obj_request;
1569
1570 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1571 if (!obj_request)
1572 return NULL;
1573
1574 ceph_object_extent_init(&obj_request->ex);
1575 INIT_LIST_HEAD(&obj_request->osd_reqs);
1576 mutex_init(&obj_request->state_mutex);
1577 kref_init(&obj_request->kref);
1578
1579 dout("%s %p\n", __func__, obj_request);
1580 return obj_request;
1581}
1582
1583static void rbd_obj_request_destroy(struct kref *kref)
1584{
1585 struct rbd_obj_request *obj_request;
1586 struct ceph_osd_request *osd_req;
1587 u32 i;
1588
1589 obj_request = container_of(kref, struct rbd_obj_request, kref);
1590
1591 dout("%s: obj %p\n", __func__, obj_request);
1592
1593 while (!list_empty(&obj_request->osd_reqs)) {
1594 osd_req = list_first_entry(&obj_request->osd_reqs,
1595 struct ceph_osd_request, r_private_item);
1596 list_del_init(&osd_req->r_private_item);
1597 ceph_osdc_put_request(osd_req);
1598 }
1599
1600 switch (obj_request->img_request->data_type) {
1601 case OBJ_REQUEST_NODATA:
1602 case OBJ_REQUEST_BIO:
1603 case OBJ_REQUEST_BVECS:
1604 break;
1605 case OBJ_REQUEST_OWN_BVECS:
1606 kfree(obj_request->bvec_pos.bvecs);
1607 break;
1608 default:
1609 BUG();
1610 }
1611
1612 kfree(obj_request->img_extents);
1613 if (obj_request->copyup_bvecs) {
1614 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1615 if (obj_request->copyup_bvecs[i].bv_page)
1616 __free_page(obj_request->copyup_bvecs[i].bv_page);
1617 }
1618 kfree(obj_request->copyup_bvecs);
1619 }
1620
1621 kmem_cache_free(rbd_obj_request_cache, obj_request);
1622}
1623
1624
1625
1626static void rbd_spec_put(struct rbd_spec *spec);
1627static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1628{
1629 rbd_dev_remove_parent(rbd_dev);
1630 rbd_spec_put(rbd_dev->parent_spec);
1631 rbd_dev->parent_spec = NULL;
1632 rbd_dev->parent_overlap = 0;
1633}
1634
1635
1636
1637
1638
1639
1640
1641static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1642{
1643 int counter;
1644
1645 if (!rbd_dev->parent_spec)
1646 return;
1647
1648 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1649 if (counter > 0)
1650 return;
1651
1652
1653
1654 if (!counter)
1655 rbd_dev_unparent(rbd_dev);
1656 else
1657 rbd_warn(rbd_dev, "parent reference underflow");
1658}
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1669{
1670 int counter = 0;
1671
1672 if (!rbd_dev->parent_spec)
1673 return false;
1674
1675 if (rbd_dev->parent_overlap)
1676 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1677
1678 if (counter < 0)
1679 rbd_warn(rbd_dev, "parent reference overflow");
1680
1681 return counter > 0;
1682}
1683
1684static void rbd_img_request_init(struct rbd_img_request *img_request,
1685 struct rbd_device *rbd_dev,
1686 enum obj_operation_type op_type)
1687{
1688 memset(img_request, 0, sizeof(*img_request));
1689
1690 img_request->rbd_dev = rbd_dev;
1691 img_request->op_type = op_type;
1692
1693 INIT_LIST_HEAD(&img_request->lock_item);
1694 INIT_LIST_HEAD(&img_request->object_extents);
1695 mutex_init(&img_request->state_mutex);
1696}
1697
1698static void rbd_img_capture_header(struct rbd_img_request *img_req)
1699{
1700 struct rbd_device *rbd_dev = img_req->rbd_dev;
1701
1702 lockdep_assert_held(&rbd_dev->header_rwsem);
1703
1704 if (rbd_img_is_write(img_req))
1705 img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1706 else
1707 img_req->snap_id = rbd_dev->spec->snap_id;
1708
1709 if (rbd_dev_parent_get(rbd_dev))
1710 img_request_layered_set(img_req);
1711}
1712
1713static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1714{
1715 struct rbd_obj_request *obj_request;
1716 struct rbd_obj_request *next_obj_request;
1717
1718 dout("%s: img %p\n", __func__, img_request);
1719
1720 WARN_ON(!list_empty(&img_request->lock_item));
1721 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1722 rbd_img_obj_request_del(img_request, obj_request);
1723
1724 if (img_request_layered_test(img_request))
1725 rbd_dev_parent_put(img_request->rbd_dev);
1726
1727 if (rbd_img_is_write(img_request))
1728 ceph_put_snap_context(img_request->snapc);
1729
1730 if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1731 kmem_cache_free(rbd_img_request_cache, img_request);
1732}
1733
1734#define BITS_PER_OBJ 2
1735#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1736#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1737
1738static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1739 u64 *index, u8 *shift)
1740{
1741 u32 off;
1742
1743 rbd_assert(objno < rbd_dev->object_map_size);
1744 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1745 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1746}
1747
1748static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1749{
1750 u64 index;
1751 u8 shift;
1752
1753 lockdep_assert_held(&rbd_dev->object_map_lock);
1754 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1755 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1756}
1757
1758static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1759{
1760 u64 index;
1761 u8 shift;
1762 u8 *p;
1763
1764 lockdep_assert_held(&rbd_dev->object_map_lock);
1765 rbd_assert(!(val & ~OBJ_MASK));
1766
1767 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1768 p = &rbd_dev->object_map[index];
1769 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1770}
1771
1772static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1773{
1774 u8 state;
1775
1776 spin_lock(&rbd_dev->object_map_lock);
1777 state = __rbd_object_map_get(rbd_dev, objno);
1778 spin_unlock(&rbd_dev->object_map_lock);
1779 return state;
1780}
1781
1782static bool use_object_map(struct rbd_device *rbd_dev)
1783{
1784
1785
1786
1787
1788
1789
1790
1791
1792 if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1793 return false;
1794
1795 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1796 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1797}
1798
1799static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1800{
1801 u8 state;
1802
1803
1804 if (!use_object_map(rbd_dev))
1805 return true;
1806
1807 state = rbd_object_map_get(rbd_dev, objno);
1808 return state != OBJECT_NONEXISTENT;
1809}
1810
1811static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1812 struct ceph_object_id *oid)
1813{
1814 if (snap_id == CEPH_NOSNAP)
1815 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1816 rbd_dev->spec->image_id);
1817 else
1818 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1819 rbd_dev->spec->image_id, snap_id);
1820}
1821
1822static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1823{
1824 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825 CEPH_DEFINE_OID_ONSTACK(oid);
1826 u8 lock_type;
1827 char *lock_tag;
1828 struct ceph_locker *lockers;
1829 u32 num_lockers;
1830 bool broke_lock = false;
1831 int ret;
1832
1833 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1834
1835again:
1836 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1837 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1838 if (ret != -EBUSY || broke_lock) {
1839 if (ret == -EEXIST)
1840 ret = 0;
1841 if (ret)
1842 rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1843 return ret;
1844 }
1845
1846 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1847 RBD_LOCK_NAME, &lock_type, &lock_tag,
1848 &lockers, &num_lockers);
1849 if (ret) {
1850 if (ret == -ENOENT)
1851 goto again;
1852
1853 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1854 return ret;
1855 }
1856
1857 kfree(lock_tag);
1858 if (num_lockers == 0)
1859 goto again;
1860
1861 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1862 ENTITY_NAME(lockers[0].id.name));
1863
1864 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1865 RBD_LOCK_NAME, lockers[0].id.cookie,
1866 &lockers[0].id.name);
1867 ceph_free_lockers(lockers, num_lockers);
1868 if (ret) {
1869 if (ret == -ENOENT)
1870 goto again;
1871
1872 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1873 return ret;
1874 }
1875
1876 broke_lock = true;
1877 goto again;
1878}
1879
1880static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1881{
1882 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1883 CEPH_DEFINE_OID_ONSTACK(oid);
1884 int ret;
1885
1886 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1887
1888 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1889 "");
1890 if (ret && ret != -ENOENT)
1891 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1892}
1893
1894static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1895{
1896 u8 struct_v;
1897 u32 struct_len;
1898 u32 header_len;
1899 void *header_end;
1900 int ret;
1901
1902 ceph_decode_32_safe(p, end, header_len, e_inval);
1903 header_end = *p + header_len;
1904
1905 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1906 &struct_len);
1907 if (ret)
1908 return ret;
1909
1910 ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1911
1912 *p = header_end;
1913 return 0;
1914
1915e_inval:
1916 return -EINVAL;
1917}
1918
1919static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1920{
1921 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1922 CEPH_DEFINE_OID_ONSTACK(oid);
1923 struct page **pages;
1924 void *p, *end;
1925 size_t reply_len;
1926 u64 num_objects;
1927 u64 object_map_bytes;
1928 u64 object_map_size;
1929 int num_pages;
1930 int ret;
1931
1932 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1933
1934 num_objects = ceph_get_num_objects(&rbd_dev->layout,
1935 rbd_dev->mapping.size);
1936 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1937 BITS_PER_BYTE);
1938 num_pages = calc_pages_for(0, object_map_bytes) + 1;
1939 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1940 if (IS_ERR(pages))
1941 return PTR_ERR(pages);
1942
1943 reply_len = num_pages * PAGE_SIZE;
1944 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1945 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1946 "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1947 NULL, 0, pages, &reply_len);
1948 if (ret)
1949 goto out;
1950
1951 p = page_address(pages[0]);
1952 end = p + min(reply_len, (size_t)PAGE_SIZE);
1953 ret = decode_object_map_header(&p, end, &object_map_size);
1954 if (ret)
1955 goto out;
1956
1957 if (object_map_size != num_objects) {
1958 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1959 object_map_size, num_objects);
1960 ret = -EINVAL;
1961 goto out;
1962 }
1963
1964 if (offset_in_page(p) + object_map_bytes > reply_len) {
1965 ret = -EINVAL;
1966 goto out;
1967 }
1968
1969 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1970 if (!rbd_dev->object_map) {
1971 ret = -ENOMEM;
1972 goto out;
1973 }
1974
1975 rbd_dev->object_map_size = object_map_size;
1976 ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1977 offset_in_page(p), object_map_bytes);
1978
1979out:
1980 ceph_release_page_vector(pages, num_pages);
1981 return ret;
1982}
1983
1984static void rbd_object_map_free(struct rbd_device *rbd_dev)
1985{
1986 kvfree(rbd_dev->object_map);
1987 rbd_dev->object_map = NULL;
1988 rbd_dev->object_map_size = 0;
1989}
1990
1991static int rbd_object_map_load(struct rbd_device *rbd_dev)
1992{
1993 int ret;
1994
1995 ret = __rbd_object_map_load(rbd_dev);
1996 if (ret)
1997 return ret;
1998
1999 ret = rbd_dev_v2_get_flags(rbd_dev);
2000 if (ret) {
2001 rbd_object_map_free(rbd_dev);
2002 return ret;
2003 }
2004
2005 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2006 rbd_warn(rbd_dev, "object map is invalid");
2007
2008 return 0;
2009}
2010
2011static int rbd_object_map_open(struct rbd_device *rbd_dev)
2012{
2013 int ret;
2014
2015 ret = rbd_object_map_lock(rbd_dev);
2016 if (ret)
2017 return ret;
2018
2019 ret = rbd_object_map_load(rbd_dev);
2020 if (ret) {
2021 rbd_object_map_unlock(rbd_dev);
2022 return ret;
2023 }
2024
2025 return 0;
2026}
2027
2028static void rbd_object_map_close(struct rbd_device *rbd_dev)
2029{
2030 rbd_object_map_free(rbd_dev);
2031 rbd_object_map_unlock(rbd_dev);
2032}
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2045 struct ceph_osd_request *osd_req)
2046{
2047 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2048 struct ceph_osd_data *osd_data;
2049 u64 objno;
2050 u8 state, new_state, uninitialized_var(current_state);
2051 bool has_current_state;
2052 void *p;
2053
2054 if (osd_req->r_result)
2055 return osd_req->r_result;
2056
2057
2058
2059
2060 if (osd_req->r_num_ops == 1)
2061 return 0;
2062
2063
2064
2065
2066 rbd_assert(osd_req->r_num_ops == 2);
2067 osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2068 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2069
2070 p = page_address(osd_data->pages[0]);
2071 objno = ceph_decode_64(&p);
2072 rbd_assert(objno == obj_req->ex.oe_objno);
2073 rbd_assert(ceph_decode_64(&p) == objno + 1);
2074 new_state = ceph_decode_8(&p);
2075 has_current_state = ceph_decode_8(&p);
2076 if (has_current_state)
2077 current_state = ceph_decode_8(&p);
2078
2079 spin_lock(&rbd_dev->object_map_lock);
2080 state = __rbd_object_map_get(rbd_dev, objno);
2081 if (!has_current_state || current_state == state ||
2082 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2083 __rbd_object_map_set(rbd_dev, objno, new_state);
2084 spin_unlock(&rbd_dev->object_map_lock);
2085
2086 return 0;
2087}
2088
2089static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2090{
2091 struct rbd_obj_request *obj_req = osd_req->r_priv;
2092 int result;
2093
2094 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2095 osd_req->r_result, obj_req);
2096
2097 result = rbd_object_map_update_finish(obj_req, osd_req);
2098 rbd_obj_handle_request(obj_req, result);
2099}
2100
2101static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2102{
2103 u8 state = rbd_object_map_get(rbd_dev, objno);
2104
2105 if (state == new_state ||
2106 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2107 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2108 return false;
2109
2110 return true;
2111}
2112
2113static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2114 int which, u64 objno, u8 new_state,
2115 const u8 *current_state)
2116{
2117 struct page **pages;
2118 void *p, *start;
2119 int ret;
2120
2121 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2122 if (ret)
2123 return ret;
2124
2125 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2126 if (IS_ERR(pages))
2127 return PTR_ERR(pages);
2128
2129 p = start = page_address(pages[0]);
2130 ceph_encode_64(&p, objno);
2131 ceph_encode_64(&p, objno + 1);
2132 ceph_encode_8(&p, new_state);
2133 if (current_state) {
2134 ceph_encode_8(&p, 1);
2135 ceph_encode_8(&p, *current_state);
2136 } else {
2137 ceph_encode_8(&p, 0);
2138 }
2139
2140 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2141 false, true);
2142 return 0;
2143}
2144
2145
2146
2147
2148
2149
2150
2151static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2152 u8 new_state, const u8 *current_state)
2153{
2154 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2155 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2156 struct ceph_osd_request *req;
2157 int num_ops = 1;
2158 int which = 0;
2159 int ret;
2160
2161 if (snap_id == CEPH_NOSNAP) {
2162 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2163 return 1;
2164
2165 num_ops++;
2166 }
2167
2168 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2169 if (!req)
2170 return -ENOMEM;
2171
2172 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2173 req->r_callback = rbd_object_map_callback;
2174 req->r_priv = obj_req;
2175
2176 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2177 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2178 req->r_flags = CEPH_OSD_FLAG_WRITE;
2179 ktime_get_real_ts64(&req->r_mtime);
2180
2181 if (snap_id == CEPH_NOSNAP) {
2182
2183
2184
2185
2186 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2187 CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2188 if (ret)
2189 return ret;
2190 }
2191
2192 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2193 new_state, current_state);
2194 if (ret)
2195 return ret;
2196
2197 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2198 if (ret)
2199 return ret;
2200
2201 ceph_osdc_start_request(osdc, req, false);
2202 return 0;
2203}
2204
2205static void prune_extents(struct ceph_file_extent *img_extents,
2206 u32 *num_img_extents, u64 overlap)
2207{
2208 u32 cnt = *num_img_extents;
2209
2210
2211 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2212 cnt--;
2213
2214 if (cnt) {
2215 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2216
2217
2218 if (ex->fe_off + ex->fe_len > overlap)
2219 ex->fe_len = overlap - ex->fe_off;
2220 }
2221
2222 *num_img_extents = cnt;
2223}
2224
2225
2226
2227
2228
2229static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2230 bool entire)
2231{
2232 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2233 int ret;
2234
2235 if (!rbd_dev->parent_overlap)
2236 return 0;
2237
2238 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2239 entire ? 0 : obj_req->ex.oe_off,
2240 entire ? rbd_dev->layout.object_size :
2241 obj_req->ex.oe_len,
2242 &obj_req->img_extents,
2243 &obj_req->num_img_extents);
2244 if (ret)
2245 return ret;
2246
2247 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2248 rbd_dev->parent_overlap);
2249 return 0;
2250}
2251
2252static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2253{
2254 struct rbd_obj_request *obj_req = osd_req->r_priv;
2255
2256 switch (obj_req->img_request->data_type) {
2257 case OBJ_REQUEST_BIO:
2258 osd_req_op_extent_osd_data_bio(osd_req, which,
2259 &obj_req->bio_pos,
2260 obj_req->ex.oe_len);
2261 break;
2262 case OBJ_REQUEST_BVECS:
2263 case OBJ_REQUEST_OWN_BVECS:
2264 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2265 obj_req->ex.oe_len);
2266 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2267 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2268 &obj_req->bvec_pos);
2269 break;
2270 default:
2271 BUG();
2272 }
2273}
2274
2275static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2276{
2277 struct page **pages;
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2288 if (IS_ERR(pages))
2289 return PTR_ERR(pages);
2290
2291 osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2292 osd_req_op_raw_data_in_pages(osd_req, which, pages,
2293 8 + sizeof(struct ceph_timespec),
2294 0, false, true);
2295 return 0;
2296}
2297
2298static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2299 u32 bytes)
2300{
2301 struct rbd_obj_request *obj_req = osd_req->r_priv;
2302 int ret;
2303
2304 ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2305 if (ret)
2306 return ret;
2307
2308 osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2309 obj_req->copyup_bvec_count, bytes);
2310 return 0;
2311}
2312
2313static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2314{
2315 obj_req->read_state = RBD_OBJ_READ_START;
2316 return 0;
2317}
2318
2319static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2320 int which)
2321{
2322 struct rbd_obj_request *obj_req = osd_req->r_priv;
2323 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2324 u16 opcode;
2325
2326 if (!use_object_map(rbd_dev) ||
2327 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2328 osd_req_op_alloc_hint_init(osd_req, which++,
2329 rbd_dev->layout.object_size,
2330 rbd_dev->layout.object_size,
2331 rbd_dev->opts->alloc_hint_flags);
2332 }
2333
2334 if (rbd_obj_is_entire(obj_req))
2335 opcode = CEPH_OSD_OP_WRITEFULL;
2336 else
2337 opcode = CEPH_OSD_OP_WRITE;
2338
2339 osd_req_op_extent_init(osd_req, which, opcode,
2340 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2341 rbd_osd_setup_data(osd_req, which);
2342}
2343
2344static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2345{
2346 int ret;
2347
2348
2349 ret = rbd_obj_calc_img_extents(obj_req, true);
2350 if (ret)
2351 return ret;
2352
2353 if (rbd_obj_copyup_enabled(obj_req))
2354 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2355
2356 obj_req->write_state = RBD_OBJ_WRITE_START;
2357 return 0;
2358}
2359
2360static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2361{
2362 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2363 CEPH_OSD_OP_ZERO;
2364}
2365
2366static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2367 int which)
2368{
2369 struct rbd_obj_request *obj_req = osd_req->r_priv;
2370
2371 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2372 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2373 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2374 } else {
2375 osd_req_op_extent_init(osd_req, which,
2376 truncate_or_zero_opcode(obj_req),
2377 obj_req->ex.oe_off, obj_req->ex.oe_len,
2378 0, 0);
2379 }
2380}
2381
2382static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2383{
2384 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2385 u64 off, next_off;
2386 int ret;
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2397 !rbd_obj_is_tail(obj_req)) {
2398 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2399 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2400 rbd_dev->opts->alloc_size);
2401 if (off >= next_off)
2402 return 1;
2403
2404 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2405 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2406 off, next_off - off);
2407 obj_req->ex.oe_off = off;
2408 obj_req->ex.oe_len = next_off - off;
2409 }
2410
2411
2412 ret = rbd_obj_calc_img_extents(obj_req, true);
2413 if (ret)
2414 return ret;
2415
2416 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2417 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2418 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2419
2420 obj_req->write_state = RBD_OBJ_WRITE_START;
2421 return 0;
2422}
2423
2424static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2425 int which)
2426{
2427 struct rbd_obj_request *obj_req = osd_req->r_priv;
2428 u16 opcode;
2429
2430 if (rbd_obj_is_entire(obj_req)) {
2431 if (obj_req->num_img_extents) {
2432 if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2433 osd_req_op_init(osd_req, which++,
2434 CEPH_OSD_OP_CREATE, 0);
2435 opcode = CEPH_OSD_OP_TRUNCATE;
2436 } else {
2437 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2438 osd_req_op_init(osd_req, which++,
2439 CEPH_OSD_OP_DELETE, 0);
2440 opcode = 0;
2441 }
2442 } else {
2443 opcode = truncate_or_zero_opcode(obj_req);
2444 }
2445
2446 if (opcode)
2447 osd_req_op_extent_init(osd_req, which, opcode,
2448 obj_req->ex.oe_off, obj_req->ex.oe_len,
2449 0, 0);
2450}
2451
2452static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2453{
2454 int ret;
2455
2456
2457 ret = rbd_obj_calc_img_extents(obj_req, true);
2458 if (ret)
2459 return ret;
2460
2461 if (rbd_obj_copyup_enabled(obj_req))
2462 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2463 if (!obj_req->num_img_extents) {
2464 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2465 if (rbd_obj_is_entire(obj_req))
2466 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2467 }
2468
2469 obj_req->write_state = RBD_OBJ_WRITE_START;
2470 return 0;
2471}
2472
2473static int count_write_ops(struct rbd_obj_request *obj_req)
2474{
2475 struct rbd_img_request *img_req = obj_req->img_request;
2476
2477 switch (img_req->op_type) {
2478 case OBJ_OP_WRITE:
2479 if (!use_object_map(img_req->rbd_dev) ||
2480 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2481 return 2;
2482
2483 return 1;
2484 case OBJ_OP_DISCARD:
2485 return 1;
2486 case OBJ_OP_ZEROOUT:
2487 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2488 !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2489 return 2;
2490
2491 return 1;
2492 default:
2493 BUG();
2494 }
2495}
2496
2497static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2498 int which)
2499{
2500 struct rbd_obj_request *obj_req = osd_req->r_priv;
2501
2502 switch (obj_req->img_request->op_type) {
2503 case OBJ_OP_WRITE:
2504 __rbd_osd_setup_write_ops(osd_req, which);
2505 break;
2506 case OBJ_OP_DISCARD:
2507 __rbd_osd_setup_discard_ops(osd_req, which);
2508 break;
2509 case OBJ_OP_ZEROOUT:
2510 __rbd_osd_setup_zeroout_ops(osd_req, which);
2511 break;
2512 default:
2513 BUG();
2514 }
2515}
2516
2517
2518
2519
2520
2521
2522static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2523{
2524 struct rbd_obj_request *obj_req, *next_obj_req;
2525 int ret;
2526
2527 for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2528 switch (img_req->op_type) {
2529 case OBJ_OP_READ:
2530 ret = rbd_obj_init_read(obj_req);
2531 break;
2532 case OBJ_OP_WRITE:
2533 ret = rbd_obj_init_write(obj_req);
2534 break;
2535 case OBJ_OP_DISCARD:
2536 ret = rbd_obj_init_discard(obj_req);
2537 break;
2538 case OBJ_OP_ZEROOUT:
2539 ret = rbd_obj_init_zeroout(obj_req);
2540 break;
2541 default:
2542 BUG();
2543 }
2544 if (ret < 0)
2545 return ret;
2546 if (ret > 0) {
2547 rbd_img_obj_request_del(img_req, obj_req);
2548 continue;
2549 }
2550 }
2551
2552 img_req->state = RBD_IMG_START;
2553 return 0;
2554}
2555
2556union rbd_img_fill_iter {
2557 struct ceph_bio_iter bio_iter;
2558 struct ceph_bvec_iter bvec_iter;
2559};
2560
2561struct rbd_img_fill_ctx {
2562 enum obj_request_type pos_type;
2563 union rbd_img_fill_iter *pos;
2564 union rbd_img_fill_iter iter;
2565 ceph_object_extent_fn_t set_pos_fn;
2566 ceph_object_extent_fn_t count_fn;
2567 ceph_object_extent_fn_t copy_fn;
2568};
2569
2570static struct ceph_object_extent *alloc_object_extent(void *arg)
2571{
2572 struct rbd_img_request *img_req = arg;
2573 struct rbd_obj_request *obj_req;
2574
2575 obj_req = rbd_obj_request_create();
2576 if (!obj_req)
2577 return NULL;
2578
2579 rbd_img_obj_request_add(img_req, obj_req);
2580 return &obj_req->ex;
2581}
2582
2583
2584
2585
2586
2587
2588
2589
2590static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2591{
2592 return l->stripe_unit != l->object_size;
2593}
2594
2595static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2596 struct ceph_file_extent *img_extents,
2597 u32 num_img_extents,
2598 struct rbd_img_fill_ctx *fctx)
2599{
2600 u32 i;
2601 int ret;
2602
2603 img_req->data_type = fctx->pos_type;
2604
2605
2606
2607
2608
2609 fctx->iter = *fctx->pos;
2610 for (i = 0; i < num_img_extents; i++) {
2611 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2612 img_extents[i].fe_off,
2613 img_extents[i].fe_len,
2614 &img_req->object_extents,
2615 alloc_object_extent, img_req,
2616 fctx->set_pos_fn, &fctx->iter);
2617 if (ret)
2618 return ret;
2619 }
2620
2621 return __rbd_img_fill_request(img_req);
2622}
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637static int rbd_img_fill_request(struct rbd_img_request *img_req,
2638 struct ceph_file_extent *img_extents,
2639 u32 num_img_extents,
2640 struct rbd_img_fill_ctx *fctx)
2641{
2642 struct rbd_device *rbd_dev = img_req->rbd_dev;
2643 struct rbd_obj_request *obj_req;
2644 u32 i;
2645 int ret;
2646
2647 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2648 !rbd_layout_is_fancy(&rbd_dev->layout))
2649 return rbd_img_fill_request_nocopy(img_req, img_extents,
2650 num_img_extents, fctx);
2651
2652 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2653
2654
2655
2656
2657
2658
2659
2660
2661 fctx->iter = *fctx->pos;
2662 for (i = 0; i < num_img_extents; i++) {
2663 ret = ceph_file_to_extents(&rbd_dev->layout,
2664 img_extents[i].fe_off,
2665 img_extents[i].fe_len,
2666 &img_req->object_extents,
2667 alloc_object_extent, img_req,
2668 fctx->count_fn, &fctx->iter);
2669 if (ret)
2670 return ret;
2671 }
2672
2673 for_each_obj_request(img_req, obj_req) {
2674 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2675 sizeof(*obj_req->bvec_pos.bvecs),
2676 GFP_NOIO);
2677 if (!obj_req->bvec_pos.bvecs)
2678 return -ENOMEM;
2679 }
2680
2681
2682
2683
2684
2685 fctx->iter = *fctx->pos;
2686 for (i = 0; i < num_img_extents; i++) {
2687 ret = ceph_iterate_extents(&rbd_dev->layout,
2688 img_extents[i].fe_off,
2689 img_extents[i].fe_len,
2690 &img_req->object_extents,
2691 fctx->copy_fn, &fctx->iter);
2692 if (ret)
2693 return ret;
2694 }
2695
2696 return __rbd_img_fill_request(img_req);
2697}
2698
2699static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2700 u64 off, u64 len)
2701{
2702 struct ceph_file_extent ex = { off, len };
2703 union rbd_img_fill_iter dummy = {};
2704 struct rbd_img_fill_ctx fctx = {
2705 .pos_type = OBJ_REQUEST_NODATA,
2706 .pos = &dummy,
2707 };
2708
2709 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2710}
2711
2712static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2713{
2714 struct rbd_obj_request *obj_req =
2715 container_of(ex, struct rbd_obj_request, ex);
2716 struct ceph_bio_iter *it = arg;
2717
2718 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2719 obj_req->bio_pos = *it;
2720 ceph_bio_iter_advance(it, bytes);
2721}
2722
2723static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2724{
2725 struct rbd_obj_request *obj_req =
2726 container_of(ex, struct rbd_obj_request, ex);
2727 struct ceph_bio_iter *it = arg;
2728
2729 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2730 ceph_bio_iter_advance_step(it, bytes, ({
2731 obj_req->bvec_count++;
2732 }));
2733
2734}
2735
2736static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2737{
2738 struct rbd_obj_request *obj_req =
2739 container_of(ex, struct rbd_obj_request, ex);
2740 struct ceph_bio_iter *it = arg;
2741
2742 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2743 ceph_bio_iter_advance_step(it, bytes, ({
2744 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2745 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2746 }));
2747}
2748
2749static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2750 struct ceph_file_extent *img_extents,
2751 u32 num_img_extents,
2752 struct ceph_bio_iter *bio_pos)
2753{
2754 struct rbd_img_fill_ctx fctx = {
2755 .pos_type = OBJ_REQUEST_BIO,
2756 .pos = (union rbd_img_fill_iter *)bio_pos,
2757 .set_pos_fn = set_bio_pos,
2758 .count_fn = count_bio_bvecs,
2759 .copy_fn = copy_bio_bvecs,
2760 };
2761
2762 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2763 &fctx);
2764}
2765
2766static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2767 u64 off, u64 len, struct bio *bio)
2768{
2769 struct ceph_file_extent ex = { off, len };
2770 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2771
2772 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2773}
2774
2775static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2776{
2777 struct rbd_obj_request *obj_req =
2778 container_of(ex, struct rbd_obj_request, ex);
2779 struct ceph_bvec_iter *it = arg;
2780
2781 obj_req->bvec_pos = *it;
2782 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2783 ceph_bvec_iter_advance(it, bytes);
2784}
2785
2786static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2787{
2788 struct rbd_obj_request *obj_req =
2789 container_of(ex, struct rbd_obj_request, ex);
2790 struct ceph_bvec_iter *it = arg;
2791
2792 ceph_bvec_iter_advance_step(it, bytes, ({
2793 obj_req->bvec_count++;
2794 }));
2795}
2796
2797static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2798{
2799 struct rbd_obj_request *obj_req =
2800 container_of(ex, struct rbd_obj_request, ex);
2801 struct ceph_bvec_iter *it = arg;
2802
2803 ceph_bvec_iter_advance_step(it, bytes, ({
2804 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2805 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2806 }));
2807}
2808
2809static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2810 struct ceph_file_extent *img_extents,
2811 u32 num_img_extents,
2812 struct ceph_bvec_iter *bvec_pos)
2813{
2814 struct rbd_img_fill_ctx fctx = {
2815 .pos_type = OBJ_REQUEST_BVECS,
2816 .pos = (union rbd_img_fill_iter *)bvec_pos,
2817 .set_pos_fn = set_bvec_pos,
2818 .count_fn = count_bvecs,
2819 .copy_fn = copy_bvecs,
2820 };
2821
2822 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2823 &fctx);
2824}
2825
2826static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2827 struct ceph_file_extent *img_extents,
2828 u32 num_img_extents,
2829 struct bio_vec *bvecs)
2830{
2831 struct ceph_bvec_iter it = {
2832 .bvecs = bvecs,
2833 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2834 num_img_extents) },
2835 };
2836
2837 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2838 &it);
2839}
2840
2841static void rbd_img_handle_request_work(struct work_struct *work)
2842{
2843 struct rbd_img_request *img_req =
2844 container_of(work, struct rbd_img_request, work);
2845
2846 rbd_img_handle_request(img_req, img_req->work_result);
2847}
2848
2849static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2850{
2851 INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2852 img_req->work_result = result;
2853 queue_work(rbd_wq, &img_req->work);
2854}
2855
2856static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2857{
2858 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2859
2860 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2861 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2862 return true;
2863 }
2864
2865 dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2866 obj_req->ex.oe_objno);
2867 return false;
2868}
2869
2870static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2871{
2872 struct ceph_osd_request *osd_req;
2873 int ret;
2874
2875 osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2876 if (IS_ERR(osd_req))
2877 return PTR_ERR(osd_req);
2878
2879 osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2880 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2881 rbd_osd_setup_data(osd_req, 0);
2882 rbd_osd_format_read(osd_req);
2883
2884 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2885 if (ret)
2886 return ret;
2887
2888 rbd_osd_submit(osd_req);
2889 return 0;
2890}
2891
2892static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2893{
2894 struct rbd_img_request *img_req = obj_req->img_request;
2895 struct rbd_device *parent = img_req->rbd_dev->parent;
2896 struct rbd_img_request *child_img_req;
2897 int ret;
2898
2899 child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2900 if (!child_img_req)
2901 return -ENOMEM;
2902
2903 rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2904 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2905 child_img_req->obj_request = obj_req;
2906
2907 down_read(&parent->header_rwsem);
2908 rbd_img_capture_header(child_img_req);
2909 up_read(&parent->header_rwsem);
2910
2911 dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2912 obj_req);
2913
2914 if (!rbd_img_is_write(img_req)) {
2915 switch (img_req->data_type) {
2916 case OBJ_REQUEST_BIO:
2917 ret = __rbd_img_fill_from_bio(child_img_req,
2918 obj_req->img_extents,
2919 obj_req->num_img_extents,
2920 &obj_req->bio_pos);
2921 break;
2922 case OBJ_REQUEST_BVECS:
2923 case OBJ_REQUEST_OWN_BVECS:
2924 ret = __rbd_img_fill_from_bvecs(child_img_req,
2925 obj_req->img_extents,
2926 obj_req->num_img_extents,
2927 &obj_req->bvec_pos);
2928 break;
2929 default:
2930 BUG();
2931 }
2932 } else {
2933 ret = rbd_img_fill_from_bvecs(child_img_req,
2934 obj_req->img_extents,
2935 obj_req->num_img_extents,
2936 obj_req->copyup_bvecs);
2937 }
2938 if (ret) {
2939 rbd_img_request_destroy(child_img_req);
2940 return ret;
2941 }
2942
2943
2944 rbd_img_schedule(child_img_req, 0);
2945 return 0;
2946}
2947
2948static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2949{
2950 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2951 int ret;
2952
2953again:
2954 switch (obj_req->read_state) {
2955 case RBD_OBJ_READ_START:
2956 rbd_assert(!*result);
2957
2958 if (!rbd_obj_may_exist(obj_req)) {
2959 *result = -ENOENT;
2960 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2961 goto again;
2962 }
2963
2964 ret = rbd_obj_read_object(obj_req);
2965 if (ret) {
2966 *result = ret;
2967 return true;
2968 }
2969 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2970 return false;
2971 case RBD_OBJ_READ_OBJECT:
2972 if (*result == -ENOENT && rbd_dev->parent_overlap) {
2973
2974 ret = rbd_obj_calc_img_extents(obj_req, false);
2975 if (ret) {
2976 *result = ret;
2977 return true;
2978 }
2979 if (obj_req->num_img_extents) {
2980 ret = rbd_obj_read_from_parent(obj_req);
2981 if (ret) {
2982 *result = ret;
2983 return true;
2984 }
2985 obj_req->read_state = RBD_OBJ_READ_PARENT;
2986 return false;
2987 }
2988 }
2989
2990
2991
2992
2993
2994
2995 if (*result == -ENOENT) {
2996 rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2997 *result = 0;
2998 } else if (*result >= 0) {
2999 if (*result < obj_req->ex.oe_len)
3000 rbd_obj_zero_range(obj_req, *result,
3001 obj_req->ex.oe_len - *result);
3002 else
3003 rbd_assert(*result == obj_req->ex.oe_len);
3004 *result = 0;
3005 }
3006 return true;
3007 case RBD_OBJ_READ_PARENT:
3008
3009
3010
3011
3012 if (!*result) {
3013 u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3014
3015 if (obj_overlap < obj_req->ex.oe_len)
3016 rbd_obj_zero_range(obj_req, obj_overlap,
3017 obj_req->ex.oe_len - obj_overlap);
3018 }
3019 return true;
3020 default:
3021 BUG();
3022 }
3023}
3024
3025static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3026{
3027 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3028
3029 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3030 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3031
3032 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3033 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3034 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3035 return true;
3036 }
3037
3038 return false;
3039}
3040
3041
3042
3043
3044
3045
3046
3047static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3048{
3049 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3050 u8 new_state;
3051
3052 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3053 return 1;
3054
3055 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3056 new_state = OBJECT_PENDING;
3057 else
3058 new_state = OBJECT_EXISTS;
3059
3060 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3061}
3062
3063static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3064{
3065 struct ceph_osd_request *osd_req;
3066 int num_ops = count_write_ops(obj_req);
3067 int which = 0;
3068 int ret;
3069
3070 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3071 num_ops++;
3072
3073 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3074 if (IS_ERR(osd_req))
3075 return PTR_ERR(osd_req);
3076
3077 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3078 ret = rbd_osd_setup_stat(osd_req, which++);
3079 if (ret)
3080 return ret;
3081 }
3082
3083 rbd_osd_setup_write_ops(osd_req, which);
3084 rbd_osd_format_write(osd_req);
3085
3086 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3087 if (ret)
3088 return ret;
3089
3090 rbd_osd_submit(osd_req);
3091 return 0;
3092}
3093
3094
3095
3096
3097static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3098{
3099 struct ceph_bvec_iter it = {
3100 .bvecs = bvecs,
3101 .iter = { .bi_size = bytes },
3102 };
3103
3104 ceph_bvec_iter_advance_step(&it, bytes, ({
3105 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3106 bv.bv_len))
3107 return false;
3108 }));
3109 return true;
3110}
3111
3112#define MODS_ONLY U32_MAX
3113
3114static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3115 u32 bytes)
3116{
3117 struct ceph_osd_request *osd_req;
3118 int ret;
3119
3120 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3121 rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3122
3123 osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3124 if (IS_ERR(osd_req))
3125 return PTR_ERR(osd_req);
3126
3127 ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3128 if (ret)
3129 return ret;
3130
3131 rbd_osd_format_write(osd_req);
3132
3133 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3134 if (ret)
3135 return ret;
3136
3137 rbd_osd_submit(osd_req);
3138 return 0;
3139}
3140
3141static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3142 u32 bytes)
3143{
3144 struct ceph_osd_request *osd_req;
3145 int num_ops = count_write_ops(obj_req);
3146 int which = 0;
3147 int ret;
3148
3149 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3150
3151 if (bytes != MODS_ONLY)
3152 num_ops++;
3153
3154 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3155 if (IS_ERR(osd_req))
3156 return PTR_ERR(osd_req);
3157
3158 if (bytes != MODS_ONLY) {
3159 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3160 if (ret)
3161 return ret;
3162 }
3163
3164 rbd_osd_setup_write_ops(osd_req, which);
3165 rbd_osd_format_write(osd_req);
3166
3167 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3168 if (ret)
3169 return ret;
3170
3171 rbd_osd_submit(osd_req);
3172 return 0;
3173}
3174
3175static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3176{
3177 u32 i;
3178
3179 rbd_assert(!obj_req->copyup_bvecs);
3180 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3181 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3182 sizeof(*obj_req->copyup_bvecs),
3183 GFP_NOIO);
3184 if (!obj_req->copyup_bvecs)
3185 return -ENOMEM;
3186
3187 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3188 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3189
3190 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3191 if (!obj_req->copyup_bvecs[i].bv_page)
3192 return -ENOMEM;
3193
3194 obj_req->copyup_bvecs[i].bv_offset = 0;
3195 obj_req->copyup_bvecs[i].bv_len = len;
3196 obj_overlap -= len;
3197 }
3198
3199 rbd_assert(!obj_overlap);
3200 return 0;
3201}
3202
3203
3204
3205
3206
3207
3208static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3209{
3210 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3211 int ret;
3212
3213 rbd_assert(obj_req->num_img_extents);
3214 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3215 rbd_dev->parent_overlap);
3216 if (!obj_req->num_img_extents) {
3217
3218
3219
3220
3221
3222
3223 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3224 }
3225
3226 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3227 if (ret)
3228 return ret;
3229
3230 return rbd_obj_read_from_parent(obj_req);
3231}
3232
3233static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3234{
3235 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3236 struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3237 u8 new_state;
3238 u32 i;
3239 int ret;
3240
3241 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3242
3243 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3244 return;
3245
3246 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3247 return;
3248
3249 for (i = 0; i < snapc->num_snaps; i++) {
3250 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3251 i + 1 < snapc->num_snaps)
3252 new_state = OBJECT_EXISTS_CLEAN;
3253 else
3254 new_state = OBJECT_EXISTS;
3255
3256 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3257 new_state, NULL);
3258 if (ret < 0) {
3259 obj_req->pending.result = ret;
3260 return;
3261 }
3262
3263 rbd_assert(!ret);
3264 obj_req->pending.num_pending++;
3265 }
3266}
3267
3268static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3269{
3270 u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3271 int ret;
3272
3273 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3274
3275
3276
3277
3278
3279
3280 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3281 bytes = 0;
3282
3283 if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3284
3285
3286
3287
3288
3289
3290 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3291 if (ret) {
3292 obj_req->pending.result = ret;
3293 return;
3294 }
3295
3296 obj_req->pending.num_pending++;
3297 bytes = MODS_ONLY;
3298 }
3299
3300 ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3301 if (ret) {
3302 obj_req->pending.result = ret;
3303 return;
3304 }
3305
3306 obj_req->pending.num_pending++;
3307}
3308
3309static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3310{
3311 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3312 int ret;
3313
3314again:
3315 switch (obj_req->copyup_state) {
3316 case RBD_OBJ_COPYUP_START:
3317 rbd_assert(!*result);
3318
3319 ret = rbd_obj_copyup_read_parent(obj_req);
3320 if (ret) {
3321 *result = ret;
3322 return true;
3323 }
3324 if (obj_req->num_img_extents)
3325 obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3326 else
3327 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3328 return false;
3329 case RBD_OBJ_COPYUP_READ_PARENT:
3330 if (*result)
3331 return true;
3332
3333 if (is_zero_bvecs(obj_req->copyup_bvecs,
3334 rbd_obj_img_extents_bytes(obj_req))) {
3335 dout("%s %p detected zeros\n", __func__, obj_req);
3336 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3337 }
3338
3339 rbd_obj_copyup_object_maps(obj_req);
3340 if (!obj_req->pending.num_pending) {
3341 *result = obj_req->pending.result;
3342 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3343 goto again;
3344 }
3345 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3346 return false;
3347 case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3348 if (!pending_result_dec(&obj_req->pending, result))
3349 return false;
3350
3351 case RBD_OBJ_COPYUP_OBJECT_MAPS:
3352 if (*result) {
3353 rbd_warn(rbd_dev, "snap object map update failed: %d",
3354 *result);
3355 return true;
3356 }
3357
3358 rbd_obj_copyup_write_object(obj_req);
3359 if (!obj_req->pending.num_pending) {
3360 *result = obj_req->pending.result;
3361 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3362 goto again;
3363 }
3364 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3365 return false;
3366 case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3367 if (!pending_result_dec(&obj_req->pending, result))
3368 return false;
3369
3370 case RBD_OBJ_COPYUP_WRITE_OBJECT:
3371 return true;
3372 default:
3373 BUG();
3374 }
3375}
3376
3377
3378
3379
3380
3381
3382
3383static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3384{
3385 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3386 u8 current_state = OBJECT_PENDING;
3387
3388 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3389 return 1;
3390
3391 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3392 return 1;
3393
3394 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3395 ¤t_state);
3396}
3397
3398static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3399{
3400 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3401 int ret;
3402
3403again:
3404 switch (obj_req->write_state) {
3405 case RBD_OBJ_WRITE_START:
3406 rbd_assert(!*result);
3407
3408 if (rbd_obj_write_is_noop(obj_req))
3409 return true;
3410
3411 ret = rbd_obj_write_pre_object_map(obj_req);
3412 if (ret < 0) {
3413 *result = ret;
3414 return true;
3415 }
3416 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3417 if (ret > 0)
3418 goto again;
3419 return false;
3420 case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3421 if (*result) {
3422 rbd_warn(rbd_dev, "pre object map update failed: %d",
3423 *result);
3424 return true;
3425 }
3426 ret = rbd_obj_write_object(obj_req);
3427 if (ret) {
3428 *result = ret;
3429 return true;
3430 }
3431 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3432 return false;
3433 case RBD_OBJ_WRITE_OBJECT:
3434 if (*result == -ENOENT) {
3435 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3436 *result = 0;
3437 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3438 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3439 goto again;
3440 }
3441
3442
3443
3444
3445 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3446 *result = 0;
3447 }
3448 if (*result)
3449 return true;
3450
3451 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3452 goto again;
3453 case __RBD_OBJ_WRITE_COPYUP:
3454 if (!rbd_obj_advance_copyup(obj_req, result))
3455 return false;
3456
3457 case RBD_OBJ_WRITE_COPYUP:
3458 if (*result) {
3459 rbd_warn(rbd_dev, "copyup failed: %d", *result);
3460 return true;
3461 }
3462 ret = rbd_obj_write_post_object_map(obj_req);
3463 if (ret < 0) {
3464 *result = ret;
3465 return true;
3466 }
3467 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3468 if (ret > 0)
3469 goto again;
3470 return false;
3471 case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3472 if (*result)
3473 rbd_warn(rbd_dev, "post object map update failed: %d",
3474 *result);
3475 return true;
3476 default:
3477 BUG();
3478 }
3479}
3480
3481
3482
3483
3484static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3485 int *result)
3486{
3487 struct rbd_img_request *img_req = obj_req->img_request;
3488 struct rbd_device *rbd_dev = img_req->rbd_dev;
3489 bool done;
3490
3491 mutex_lock(&obj_req->state_mutex);
3492 if (!rbd_img_is_write(img_req))
3493 done = rbd_obj_advance_read(obj_req, result);
3494 else
3495 done = rbd_obj_advance_write(obj_req, result);
3496 mutex_unlock(&obj_req->state_mutex);
3497
3498 if (done && *result) {
3499 rbd_assert(*result < 0);
3500 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3501 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3502 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3503 }
3504 return done;
3505}
3506
3507
3508
3509
3510
3511static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3512{
3513 if (__rbd_obj_handle_request(obj_req, &result))
3514 rbd_img_handle_request(obj_req->img_request, result);
3515}
3516
3517static bool need_exclusive_lock(struct rbd_img_request *img_req)
3518{
3519 struct rbd_device *rbd_dev = img_req->rbd_dev;
3520
3521 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3522 return false;
3523
3524 if (rbd_is_ro(rbd_dev))
3525 return false;
3526
3527 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3528 if (rbd_dev->opts->lock_on_read ||
3529 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3530 return true;
3531
3532 return rbd_img_is_write(img_req);
3533}
3534
3535static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3536{
3537 struct rbd_device *rbd_dev = img_req->rbd_dev;
3538 bool locked;
3539
3540 lockdep_assert_held(&rbd_dev->lock_rwsem);
3541 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3542 spin_lock(&rbd_dev->lock_lists_lock);
3543 rbd_assert(list_empty(&img_req->lock_item));
3544 if (!locked)
3545 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3546 else
3547 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3548 spin_unlock(&rbd_dev->lock_lists_lock);
3549 return locked;
3550}
3551
3552static void rbd_lock_del_request(struct rbd_img_request *img_req)
3553{
3554 struct rbd_device *rbd_dev = img_req->rbd_dev;
3555 bool need_wakeup;
3556
3557 lockdep_assert_held(&rbd_dev->lock_rwsem);
3558 spin_lock(&rbd_dev->lock_lists_lock);
3559 rbd_assert(!list_empty(&img_req->lock_item));
3560 list_del_init(&img_req->lock_item);
3561 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3562 list_empty(&rbd_dev->running_list));
3563 spin_unlock(&rbd_dev->lock_lists_lock);
3564 if (need_wakeup)
3565 complete(&rbd_dev->releasing_wait);
3566}
3567
3568static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3569{
3570 struct rbd_device *rbd_dev = img_req->rbd_dev;
3571
3572 if (!need_exclusive_lock(img_req))
3573 return 1;
3574
3575 if (rbd_lock_add_request(img_req))
3576 return 1;
3577
3578 if (rbd_dev->opts->exclusive) {
3579 WARN_ON(1);
3580 return -EROFS;
3581 }
3582
3583
3584
3585
3586
3587 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3588 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3589 return 0;
3590}
3591
3592static void rbd_img_object_requests(struct rbd_img_request *img_req)
3593{
3594 struct rbd_obj_request *obj_req;
3595
3596 rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3597
3598 for_each_obj_request(img_req, obj_req) {
3599 int result = 0;
3600
3601 if (__rbd_obj_handle_request(obj_req, &result)) {
3602 if (result) {
3603 img_req->pending.result = result;
3604 return;
3605 }
3606 } else {
3607 img_req->pending.num_pending++;
3608 }
3609 }
3610}
3611
3612static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3613{
3614 struct rbd_device *rbd_dev = img_req->rbd_dev;
3615 int ret;
3616
3617again:
3618 switch (img_req->state) {
3619 case RBD_IMG_START:
3620 rbd_assert(!*result);
3621
3622 ret = rbd_img_exclusive_lock(img_req);
3623 if (ret < 0) {
3624 *result = ret;
3625 return true;
3626 }
3627 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3628 if (ret > 0)
3629 goto again;
3630 return false;
3631 case RBD_IMG_EXCLUSIVE_LOCK:
3632 if (*result)
3633 return true;
3634
3635 rbd_assert(!need_exclusive_lock(img_req) ||
3636 __rbd_is_lock_owner(rbd_dev));
3637
3638 rbd_img_object_requests(img_req);
3639 if (!img_req->pending.num_pending) {
3640 *result = img_req->pending.result;
3641 img_req->state = RBD_IMG_OBJECT_REQUESTS;
3642 goto again;
3643 }
3644 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3645 return false;
3646 case __RBD_IMG_OBJECT_REQUESTS:
3647 if (!pending_result_dec(&img_req->pending, result))
3648 return false;
3649
3650 case RBD_IMG_OBJECT_REQUESTS:
3651 return true;
3652 default:
3653 BUG();
3654 }
3655}
3656
3657
3658
3659
3660static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3661 int *result)
3662{
3663 struct rbd_device *rbd_dev = img_req->rbd_dev;
3664 bool done;
3665
3666 if (need_exclusive_lock(img_req)) {
3667 down_read(&rbd_dev->lock_rwsem);
3668 mutex_lock(&img_req->state_mutex);
3669 done = rbd_img_advance(img_req, result);
3670 if (done)
3671 rbd_lock_del_request(img_req);
3672 mutex_unlock(&img_req->state_mutex);
3673 up_read(&rbd_dev->lock_rwsem);
3674 } else {
3675 mutex_lock(&img_req->state_mutex);
3676 done = rbd_img_advance(img_req, result);
3677 mutex_unlock(&img_req->state_mutex);
3678 }
3679
3680 if (done && *result) {
3681 rbd_assert(*result < 0);
3682 rbd_warn(rbd_dev, "%s%s result %d",
3683 test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3684 obj_op_name(img_req->op_type), *result);
3685 }
3686 return done;
3687}
3688
3689static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3690{
3691again:
3692 if (!__rbd_img_handle_request(img_req, &result))
3693 return;
3694
3695 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3696 struct rbd_obj_request *obj_req = img_req->obj_request;
3697
3698 rbd_img_request_destroy(img_req);
3699 if (__rbd_obj_handle_request(obj_req, &result)) {
3700 img_req = obj_req->img_request;
3701 goto again;
3702 }
3703 } else {
3704 struct request *rq = blk_mq_rq_from_pdu(img_req);
3705
3706 rbd_img_request_destroy(img_req);
3707 blk_mq_end_request(rq, errno_to_blk_status(result));
3708 }
3709}
3710
3711static const struct rbd_client_id rbd_empty_cid;
3712
3713static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3714 const struct rbd_client_id *rhs)
3715{
3716 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3717}
3718
3719static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3720{
3721 struct rbd_client_id cid;
3722
3723 mutex_lock(&rbd_dev->watch_mutex);
3724 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3725 cid.handle = rbd_dev->watch_cookie;
3726 mutex_unlock(&rbd_dev->watch_mutex);
3727 return cid;
3728}
3729
3730
3731
3732
3733static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3734 const struct rbd_client_id *cid)
3735{
3736 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3737 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3738 cid->gid, cid->handle);
3739 rbd_dev->owner_cid = *cid;
3740}
3741
3742static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3743{
3744 mutex_lock(&rbd_dev->watch_mutex);
3745 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3746 mutex_unlock(&rbd_dev->watch_mutex);
3747}
3748
3749static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3750{
3751 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3752
3753 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3754 strcpy(rbd_dev->lock_cookie, cookie);
3755 rbd_set_owner_cid(rbd_dev, &cid);
3756 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3757}
3758
3759
3760
3761
3762static int rbd_lock(struct rbd_device *rbd_dev)
3763{
3764 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3765 char cookie[32];
3766 int ret;
3767
3768 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3769 rbd_dev->lock_cookie[0] != '\0');
3770
3771 format_lock_cookie(rbd_dev, cookie);
3772 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3773 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3774 RBD_LOCK_TAG, "", 0);
3775 if (ret)
3776 return ret;
3777
3778 __rbd_lock(rbd_dev, cookie);
3779 return 0;
3780}
3781
3782
3783
3784
3785static void rbd_unlock(struct rbd_device *rbd_dev)
3786{
3787 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3788 int ret;
3789
3790 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3791 rbd_dev->lock_cookie[0] == '\0');
3792
3793 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3794 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3795 if (ret && ret != -ENOENT)
3796 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3797
3798
3799 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3800 rbd_dev->lock_cookie[0] = '\0';
3801 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3802 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3803}
3804
3805static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3806 enum rbd_notify_op notify_op,
3807 struct page ***preply_pages,
3808 size_t *preply_len)
3809{
3810 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3811 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3812 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3813 int buf_size = sizeof(buf);
3814 void *p = buf;
3815
3816 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3817
3818
3819 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3820 ceph_encode_32(&p, notify_op);
3821 ceph_encode_64(&p, cid.gid);
3822 ceph_encode_64(&p, cid.handle);
3823
3824 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3825 &rbd_dev->header_oloc, buf, buf_size,
3826 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3827}
3828
3829static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3830 enum rbd_notify_op notify_op)
3831{
3832 __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3833}
3834
3835static void rbd_notify_acquired_lock(struct work_struct *work)
3836{
3837 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3838 acquired_lock_work);
3839
3840 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3841}
3842
3843static void rbd_notify_released_lock(struct work_struct *work)
3844{
3845 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3846 released_lock_work);
3847
3848 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3849}
3850
3851static int rbd_request_lock(struct rbd_device *rbd_dev)
3852{
3853 struct page **reply_pages;
3854 size_t reply_len;
3855 bool lock_owner_responded = false;
3856 int ret;
3857
3858 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3859
3860 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3861 &reply_pages, &reply_len);
3862 if (ret && ret != -ETIMEDOUT) {
3863 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3864 goto out;
3865 }
3866
3867 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3868 void *p = page_address(reply_pages[0]);
3869 void *const end = p + reply_len;
3870 u32 n;
3871
3872 ceph_decode_32_safe(&p, end, n, e_inval);
3873 while (n--) {
3874 u8 struct_v;
3875 u32 len;
3876
3877 ceph_decode_need(&p, end, 8 + 8, e_inval);
3878 p += 8 + 8;
3879
3880 ceph_decode_32_safe(&p, end, len, e_inval);
3881 if (!len)
3882 continue;
3883
3884 if (lock_owner_responded) {
3885 rbd_warn(rbd_dev,
3886 "duplicate lock owners detected");
3887 ret = -EIO;
3888 goto out;
3889 }
3890
3891 lock_owner_responded = true;
3892 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3893 &struct_v, &len);
3894 if (ret) {
3895 rbd_warn(rbd_dev,
3896 "failed to decode ResponseMessage: %d",
3897 ret);
3898 goto e_inval;
3899 }
3900
3901 ret = ceph_decode_32(&p);
3902 }
3903 }
3904
3905 if (!lock_owner_responded) {
3906 rbd_warn(rbd_dev, "no lock owners detected");
3907 ret = -ETIMEDOUT;
3908 }
3909
3910out:
3911 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3912 return ret;
3913
3914e_inval:
3915 ret = -EINVAL;
3916 goto out;
3917}
3918
3919
3920
3921
3922
3923static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3924{
3925 struct rbd_img_request *img_req;
3926
3927 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3928 lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
3929
3930 cancel_delayed_work(&rbd_dev->lock_dwork);
3931 if (!completion_done(&rbd_dev->acquire_wait)) {
3932 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3933 list_empty(&rbd_dev->running_list));
3934 rbd_dev->acquire_err = result;
3935 complete_all(&rbd_dev->acquire_wait);
3936 return;
3937 }
3938
3939 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3940 mutex_lock(&img_req->state_mutex);
3941 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3942 rbd_img_schedule(img_req, result);
3943 mutex_unlock(&img_req->state_mutex);
3944 }
3945
3946 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3947}
3948
3949static int get_lock_owner_info(struct rbd_device *rbd_dev,
3950 struct ceph_locker **lockers, u32 *num_lockers)
3951{
3952 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3953 u8 lock_type;
3954 char *lock_tag;
3955 int ret;
3956
3957 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3958
3959 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3960 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3961 &lock_type, &lock_tag, lockers, num_lockers);
3962 if (ret)
3963 return ret;
3964
3965 if (*num_lockers == 0) {
3966 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3967 goto out;
3968 }
3969
3970 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3971 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3972 lock_tag);
3973 ret = -EBUSY;
3974 goto out;
3975 }
3976
3977 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3978 rbd_warn(rbd_dev, "shared lock type detected");
3979 ret = -EBUSY;
3980 goto out;
3981 }
3982
3983 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3984 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3985 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3986 (*lockers)[0].id.cookie);
3987 ret = -EBUSY;
3988 goto out;
3989 }
3990
3991out:
3992 kfree(lock_tag);
3993 return ret;
3994}
3995
3996static int find_watcher(struct rbd_device *rbd_dev,
3997 const struct ceph_locker *locker)
3998{
3999 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4000 struct ceph_watch_item *watchers;
4001 u32 num_watchers;
4002 u64 cookie;
4003 int i;
4004 int ret;
4005
4006 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4007 &rbd_dev->header_oloc, &watchers,
4008 &num_watchers);
4009 if (ret)
4010 return ret;
4011
4012 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4013 for (i = 0; i < num_watchers; i++) {
4014
4015
4016
4017
4018 if (ceph_addr_equal_no_type(&watchers[i].addr,
4019 &locker->info.addr) &&
4020 watchers[i].cookie == cookie) {
4021 struct rbd_client_id cid = {
4022 .gid = le64_to_cpu(watchers[i].name.num),
4023 .handle = cookie,
4024 };
4025
4026 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4027 rbd_dev, cid.gid, cid.handle);
4028 rbd_set_owner_cid(rbd_dev, &cid);
4029 ret = 1;
4030 goto out;
4031 }
4032 }
4033
4034 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4035 ret = 0;
4036out:
4037 kfree(watchers);
4038 return ret;
4039}
4040
4041
4042
4043
4044static int rbd_try_lock(struct rbd_device *rbd_dev)
4045{
4046 struct ceph_client *client = rbd_dev->rbd_client->client;
4047 struct ceph_locker *lockers;
4048 u32 num_lockers;
4049 int ret;
4050
4051 for (;;) {
4052 ret = rbd_lock(rbd_dev);
4053 if (ret != -EBUSY)
4054 return ret;
4055
4056
4057 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4058 if (ret)
4059 return ret;
4060
4061 if (num_lockers == 0)
4062 goto again;
4063
4064 ret = find_watcher(rbd_dev, lockers);
4065 if (ret)
4066 goto out;
4067
4068 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4069 ENTITY_NAME(lockers[0].id.name));
4070
4071 ret = ceph_monc_blocklist_add(&client->monc,
4072 &lockers[0].info.addr);
4073 if (ret) {
4074 rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
4075 ENTITY_NAME(lockers[0].id.name), ret);
4076 goto out;
4077 }
4078
4079 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4080 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4081 lockers[0].id.cookie,
4082 &lockers[0].id.name);
4083 if (ret && ret != -ENOENT)
4084 goto out;
4085
4086again:
4087 ceph_free_lockers(lockers, num_lockers);
4088 }
4089
4090out:
4091 ceph_free_lockers(lockers, num_lockers);
4092 return ret;
4093}
4094
4095static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4096{
4097 int ret;
4098
4099 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4100 ret = rbd_object_map_open(rbd_dev);
4101 if (ret)
4102 return ret;
4103 }
4104
4105 return 0;
4106}
4107
4108
4109
4110
4111
4112
4113
4114static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4115{
4116 int ret;
4117
4118 down_read(&rbd_dev->lock_rwsem);
4119 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4120 rbd_dev->lock_state);
4121 if (__rbd_is_lock_owner(rbd_dev)) {
4122 up_read(&rbd_dev->lock_rwsem);
4123 return 0;
4124 }
4125
4126 up_read(&rbd_dev->lock_rwsem);
4127 down_write(&rbd_dev->lock_rwsem);
4128 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4129 rbd_dev->lock_state);
4130 if (__rbd_is_lock_owner(rbd_dev)) {
4131 up_write(&rbd_dev->lock_rwsem);
4132 return 0;
4133 }
4134
4135 ret = rbd_try_lock(rbd_dev);
4136 if (ret < 0) {
4137 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4138 if (ret == -EBLOCKLISTED)
4139 goto out;
4140
4141 ret = 1;
4142 }
4143 if (ret > 0) {
4144 up_write(&rbd_dev->lock_rwsem);
4145 return ret;
4146 }
4147
4148 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4149 rbd_assert(list_empty(&rbd_dev->running_list));
4150
4151 ret = rbd_post_acquire_action(rbd_dev);
4152 if (ret) {
4153 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4154
4155
4156
4157
4158
4159 rbd_unlock(rbd_dev);
4160 }
4161
4162out:
4163 wake_lock_waiters(rbd_dev, ret);
4164 up_write(&rbd_dev->lock_rwsem);
4165 return ret;
4166}
4167
4168static void rbd_acquire_lock(struct work_struct *work)
4169{
4170 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4171 struct rbd_device, lock_dwork);
4172 int ret;
4173
4174 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4175again:
4176 ret = rbd_try_acquire_lock(rbd_dev);
4177 if (ret <= 0) {
4178 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4179 return;
4180 }
4181
4182 ret = rbd_request_lock(rbd_dev);
4183 if (ret == -ETIMEDOUT) {
4184 goto again;
4185 } else if (ret == -EROFS) {
4186 rbd_warn(rbd_dev, "peer will not release lock");
4187 down_write(&rbd_dev->lock_rwsem);
4188 wake_lock_waiters(rbd_dev, ret);
4189 up_write(&rbd_dev->lock_rwsem);
4190 } else if (ret < 0) {
4191 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4192 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4193 RBD_RETRY_DELAY);
4194 } else {
4195
4196
4197
4198
4199 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4200 rbd_dev);
4201 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4202 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4203 }
4204}
4205
4206static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4207{
4208 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4209 lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
4210
4211 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4212 return false;
4213
4214
4215
4216
4217 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4218 rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4219 if (list_empty(&rbd_dev->running_list))
4220 return true;
4221
4222 up_write(&rbd_dev->lock_rwsem);
4223 wait_for_completion(&rbd_dev->releasing_wait);
4224
4225 down_write(&rbd_dev->lock_rwsem);
4226 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4227 return false;
4228
4229 rbd_assert(list_empty(&rbd_dev->running_list));
4230 return true;
4231}
4232
4233static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4234{
4235 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4236 rbd_object_map_close(rbd_dev);
4237}
4238
4239static void __rbd_release_lock(struct rbd_device *rbd_dev)
4240{
4241 rbd_assert(list_empty(&rbd_dev->running_list));
4242
4243 rbd_pre_release_action(rbd_dev);
4244 rbd_unlock(rbd_dev);
4245}
4246
4247
4248
4249
4250static void rbd_release_lock(struct rbd_device *rbd_dev)
4251{
4252 if (!rbd_quiesce_lock(rbd_dev))
4253 return;
4254
4255 __rbd_release_lock(rbd_dev);
4256
4257
4258
4259
4260
4261
4262
4263
4264 cancel_delayed_work(&rbd_dev->lock_dwork);
4265}
4266
4267static void rbd_release_lock_work(struct work_struct *work)
4268{
4269 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4270 unlock_work);
4271
4272 down_write(&rbd_dev->lock_rwsem);
4273 rbd_release_lock(rbd_dev);
4274 up_write(&rbd_dev->lock_rwsem);
4275}
4276
4277static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4278{
4279 bool have_requests;
4280
4281 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4282 if (__rbd_is_lock_owner(rbd_dev))
4283 return;
4284
4285 spin_lock(&rbd_dev->lock_lists_lock);
4286 have_requests = !list_empty(&rbd_dev->acquiring_list);
4287 spin_unlock(&rbd_dev->lock_lists_lock);
4288 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4289 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4290 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4291 }
4292}
4293
4294static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4295 void **p)
4296{
4297 struct rbd_client_id cid = { 0 };
4298
4299 if (struct_v >= 2) {
4300 cid.gid = ceph_decode_64(p);
4301 cid.handle = ceph_decode_64(p);
4302 }
4303
4304 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4305 cid.handle);
4306 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4307 down_write(&rbd_dev->lock_rwsem);
4308 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4309 dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4310 __func__, rbd_dev, cid.gid, cid.handle);
4311 } else {
4312 rbd_set_owner_cid(rbd_dev, &cid);
4313 }
4314 downgrade_write(&rbd_dev->lock_rwsem);
4315 } else {
4316 down_read(&rbd_dev->lock_rwsem);
4317 }
4318
4319 maybe_kick_acquire(rbd_dev);
4320 up_read(&rbd_dev->lock_rwsem);
4321}
4322
4323static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4324 void **p)
4325{
4326 struct rbd_client_id cid = { 0 };
4327
4328 if (struct_v >= 2) {
4329 cid.gid = ceph_decode_64(p);
4330 cid.handle = ceph_decode_64(p);
4331 }
4332
4333 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4334 cid.handle);
4335 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4336 down_write(&rbd_dev->lock_rwsem);
4337 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4338 dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4339 __func__, rbd_dev, cid.gid, cid.handle,
4340 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4341 } else {
4342 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4343 }
4344 downgrade_write(&rbd_dev->lock_rwsem);
4345 } else {
4346 down_read(&rbd_dev->lock_rwsem);
4347 }
4348
4349 maybe_kick_acquire(rbd_dev);
4350 up_read(&rbd_dev->lock_rwsem);
4351}
4352
4353
4354
4355
4356
4357static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4358 void **p)
4359{
4360 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4361 struct rbd_client_id cid = { 0 };
4362 int result = 1;
4363
4364 if (struct_v >= 2) {
4365 cid.gid = ceph_decode_64(p);
4366 cid.handle = ceph_decode_64(p);
4367 }
4368
4369 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4370 cid.handle);
4371 if (rbd_cid_equal(&cid, &my_cid))
4372 return result;
4373
4374 down_read(&rbd_dev->lock_rwsem);
4375 if (__rbd_is_lock_owner(rbd_dev)) {
4376 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4377 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4378 goto out_unlock;
4379
4380
4381
4382
4383
4384 result = 0;
4385
4386 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4387 if (!rbd_dev->opts->exclusive) {
4388 dout("%s rbd_dev %p queueing unlock_work\n",
4389 __func__, rbd_dev);
4390 queue_work(rbd_dev->task_wq,
4391 &rbd_dev->unlock_work);
4392 } else {
4393
4394 result = -EROFS;
4395 }
4396 }
4397 }
4398
4399out_unlock:
4400 up_read(&rbd_dev->lock_rwsem);
4401 return result;
4402}
4403
4404static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4405 u64 notify_id, u64 cookie, s32 *result)
4406{
4407 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4408 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4409 int buf_size = sizeof(buf);
4410 int ret;
4411
4412 if (result) {
4413 void *p = buf;
4414
4415
4416 ceph_start_encoding(&p, 1, 1,
4417 buf_size - CEPH_ENCODING_START_BLK_LEN);
4418 ceph_encode_32(&p, *result);
4419 } else {
4420 buf_size = 0;
4421 }
4422
4423 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4424 &rbd_dev->header_oloc, notify_id, cookie,
4425 buf, buf_size);
4426 if (ret)
4427 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4428}
4429
4430static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4431 u64 cookie)
4432{
4433 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4434 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4435}
4436
4437static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4438 u64 notify_id, u64 cookie, s32 result)
4439{
4440 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4441 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4442}
4443
4444static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4445 u64 notifier_id, void *data, size_t data_len)
4446{
4447 struct rbd_device *rbd_dev = arg;
4448 void *p = data;
4449 void *const end = p + data_len;
4450 u8 struct_v = 0;
4451 u32 len;
4452 u32 notify_op;
4453 int ret;
4454
4455 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4456 __func__, rbd_dev, cookie, notify_id, data_len);
4457 if (data_len) {
4458 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4459 &struct_v, &len);
4460 if (ret) {
4461 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4462 ret);
4463 return;
4464 }
4465
4466 notify_op = ceph_decode_32(&p);
4467 } else {
4468
4469 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4470 len = 0;
4471 }
4472
4473 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4474 switch (notify_op) {
4475 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4476 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4477 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4478 break;
4479 case RBD_NOTIFY_OP_RELEASED_LOCK:
4480 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4481 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4482 break;
4483 case RBD_NOTIFY_OP_REQUEST_LOCK:
4484 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4485 if (ret <= 0)
4486 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4487 cookie, ret);
4488 else
4489 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4490 break;
4491 case RBD_NOTIFY_OP_HEADER_UPDATE:
4492 ret = rbd_dev_refresh(rbd_dev);
4493 if (ret)
4494 rbd_warn(rbd_dev, "refresh failed: %d", ret);
4495
4496 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4497 break;
4498 default:
4499 if (rbd_is_lock_owner(rbd_dev))
4500 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4501 cookie, -EOPNOTSUPP);
4502 else
4503 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4504 break;
4505 }
4506}
4507
4508static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4509
4510static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4511{
4512 struct rbd_device *rbd_dev = arg;
4513
4514 rbd_warn(rbd_dev, "encountered watch error: %d", err);
4515
4516 down_write(&rbd_dev->lock_rwsem);
4517 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4518 up_write(&rbd_dev->lock_rwsem);
4519
4520 mutex_lock(&rbd_dev->watch_mutex);
4521 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4522 __rbd_unregister_watch(rbd_dev);
4523 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4524
4525 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4526 }
4527 mutex_unlock(&rbd_dev->watch_mutex);
4528}
4529
4530
4531
4532
4533static int __rbd_register_watch(struct rbd_device *rbd_dev)
4534{
4535 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4536 struct ceph_osd_linger_request *handle;
4537
4538 rbd_assert(!rbd_dev->watch_handle);
4539 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4540
4541 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4542 &rbd_dev->header_oloc, rbd_watch_cb,
4543 rbd_watch_errcb, rbd_dev);
4544 if (IS_ERR(handle))
4545 return PTR_ERR(handle);
4546
4547 rbd_dev->watch_handle = handle;
4548 return 0;
4549}
4550
4551
4552
4553
4554static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4555{
4556 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4557 int ret;
4558
4559 rbd_assert(rbd_dev->watch_handle);
4560 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4561
4562 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4563 if (ret)
4564 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4565
4566 rbd_dev->watch_handle = NULL;
4567}
4568
4569static int rbd_register_watch(struct rbd_device *rbd_dev)
4570{
4571 int ret;
4572
4573 mutex_lock(&rbd_dev->watch_mutex);
4574 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4575 ret = __rbd_register_watch(rbd_dev);
4576 if (ret)
4577 goto out;
4578
4579 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4580 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4581
4582out:
4583 mutex_unlock(&rbd_dev->watch_mutex);
4584 return ret;
4585}
4586
4587static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4588{
4589 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4590
4591 cancel_work_sync(&rbd_dev->acquired_lock_work);
4592 cancel_work_sync(&rbd_dev->released_lock_work);
4593 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4594 cancel_work_sync(&rbd_dev->unlock_work);
4595}
4596
4597
4598
4599
4600
4601static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4602{
4603 cancel_tasks_sync(rbd_dev);
4604
4605 mutex_lock(&rbd_dev->watch_mutex);
4606 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4607 __rbd_unregister_watch(rbd_dev);
4608 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4609 mutex_unlock(&rbd_dev->watch_mutex);
4610
4611 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4612 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4613}
4614
4615
4616
4617
4618static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4619{
4620 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4621 char cookie[32];
4622 int ret;
4623
4624 if (!rbd_quiesce_lock(rbd_dev))
4625 return;
4626
4627 format_lock_cookie(rbd_dev, cookie);
4628 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4629 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4630 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4631 RBD_LOCK_TAG, cookie);
4632 if (ret) {
4633 if (ret != -EOPNOTSUPP)
4634 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4635 ret);
4636
4637
4638
4639
4640
4641 __rbd_release_lock(rbd_dev);
4642 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4643 } else {
4644 __rbd_lock(rbd_dev, cookie);
4645 wake_lock_waiters(rbd_dev, 0);
4646 }
4647}
4648
4649static void rbd_reregister_watch(struct work_struct *work)
4650{
4651 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4652 struct rbd_device, watch_dwork);
4653 int ret;
4654
4655 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4656
4657 mutex_lock(&rbd_dev->watch_mutex);
4658 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4659 mutex_unlock(&rbd_dev->watch_mutex);
4660 return;
4661 }
4662
4663 ret = __rbd_register_watch(rbd_dev);
4664 if (ret) {
4665 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4666 if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4667 queue_delayed_work(rbd_dev->task_wq,
4668 &rbd_dev->watch_dwork,
4669 RBD_RETRY_DELAY);
4670 mutex_unlock(&rbd_dev->watch_mutex);
4671 return;
4672 }
4673
4674 mutex_unlock(&rbd_dev->watch_mutex);
4675 down_write(&rbd_dev->lock_rwsem);
4676 wake_lock_waiters(rbd_dev, ret);
4677 up_write(&rbd_dev->lock_rwsem);
4678 return;
4679 }
4680
4681 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4682 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4683 mutex_unlock(&rbd_dev->watch_mutex);
4684
4685 down_write(&rbd_dev->lock_rwsem);
4686 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4687 rbd_reacquire_lock(rbd_dev);
4688 up_write(&rbd_dev->lock_rwsem);
4689
4690 ret = rbd_dev_refresh(rbd_dev);
4691 if (ret)
4692 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4693}
4694
4695
4696
4697
4698
4699static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4700 struct ceph_object_id *oid,
4701 struct ceph_object_locator *oloc,
4702 const char *method_name,
4703 const void *outbound,
4704 size_t outbound_size,
4705 void *inbound,
4706 size_t inbound_size)
4707{
4708 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4709 struct page *req_page = NULL;
4710 struct page *reply_page;
4711 int ret;
4712
4713
4714
4715
4716
4717
4718
4719
4720 if (outbound) {
4721 if (outbound_size > PAGE_SIZE)
4722 return -E2BIG;
4723
4724 req_page = alloc_page(GFP_KERNEL);
4725 if (!req_page)
4726 return -ENOMEM;
4727
4728 memcpy(page_address(req_page), outbound, outbound_size);
4729 }
4730
4731 reply_page = alloc_page(GFP_KERNEL);
4732 if (!reply_page) {
4733 if (req_page)
4734 __free_page(req_page);
4735 return -ENOMEM;
4736 }
4737
4738 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4739 CEPH_OSD_FLAG_READ, req_page, outbound_size,
4740 &reply_page, &inbound_size);
4741 if (!ret) {
4742 memcpy(inbound, page_address(reply_page), inbound_size);
4743 ret = inbound_size;
4744 }
4745
4746 if (req_page)
4747 __free_page(req_page);
4748 __free_page(reply_page);
4749 return ret;
4750}
4751
4752static void rbd_queue_workfn(struct work_struct *work)
4753{
4754 struct rbd_img_request *img_request =
4755 container_of(work, struct rbd_img_request, work);
4756 struct rbd_device *rbd_dev = img_request->rbd_dev;
4757 enum obj_operation_type op_type = img_request->op_type;
4758 struct request *rq = blk_mq_rq_from_pdu(img_request);
4759 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4760 u64 length = blk_rq_bytes(rq);
4761 u64 mapping_size;
4762 int result;
4763
4764
4765 if (!length) {
4766 dout("%s: zero-length request\n", __func__);
4767 result = 0;
4768 goto err_img_request;
4769 }
4770
4771 blk_mq_start_request(rq);
4772
4773 down_read(&rbd_dev->header_rwsem);
4774 mapping_size = rbd_dev->mapping.size;
4775 rbd_img_capture_header(img_request);
4776 up_read(&rbd_dev->header_rwsem);
4777
4778 if (offset + length > mapping_size) {
4779 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4780 length, mapping_size);
4781 result = -EIO;
4782 goto err_img_request;
4783 }
4784
4785 dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4786 img_request, obj_op_name(op_type), offset, length);
4787
4788 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4789 result = rbd_img_fill_nodata(img_request, offset, length);
4790 else
4791 result = rbd_img_fill_from_bio(img_request, offset, length,
4792 rq->bio);
4793 if (result)
4794 goto err_img_request;
4795
4796 rbd_img_handle_request(img_request, 0);
4797 return;
4798
4799err_img_request:
4800 rbd_img_request_destroy(img_request);
4801 if (result)
4802 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4803 obj_op_name(op_type), length, offset, result);
4804 blk_mq_end_request(rq, errno_to_blk_status(result));
4805}
4806
4807static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4808 const struct blk_mq_queue_data *bd)
4809{
4810 struct rbd_device *rbd_dev = hctx->queue->queuedata;
4811 struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4812 enum obj_operation_type op_type;
4813
4814 switch (req_op(bd->rq)) {
4815 case REQ_OP_DISCARD:
4816 op_type = OBJ_OP_DISCARD;
4817 break;
4818 case REQ_OP_WRITE_ZEROES:
4819 op_type = OBJ_OP_ZEROOUT;
4820 break;
4821 case REQ_OP_WRITE:
4822 op_type = OBJ_OP_WRITE;
4823 break;
4824 case REQ_OP_READ:
4825 op_type = OBJ_OP_READ;
4826 break;
4827 default:
4828 rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4829 return BLK_STS_IOERR;
4830 }
4831
4832 rbd_img_request_init(img_req, rbd_dev, op_type);
4833
4834 if (rbd_img_is_write(img_req)) {
4835 if (rbd_is_ro(rbd_dev)) {
4836 rbd_warn(rbd_dev, "%s on read-only mapping",
4837 obj_op_name(img_req->op_type));
4838 return BLK_STS_IOERR;
4839 }
4840 rbd_assert(!rbd_is_snap(rbd_dev));
4841 }
4842
4843 INIT_WORK(&img_req->work, rbd_queue_workfn);
4844 queue_work(rbd_wq, &img_req->work);
4845 return BLK_STS_OK;
4846}
4847
4848static void rbd_free_disk(struct rbd_device *rbd_dev)
4849{
4850 blk_cleanup_queue(rbd_dev->disk->queue);
4851 blk_mq_free_tag_set(&rbd_dev->tag_set);
4852 put_disk(rbd_dev->disk);
4853 rbd_dev->disk = NULL;
4854}
4855
4856static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4857 struct ceph_object_id *oid,
4858 struct ceph_object_locator *oloc,
4859 void *buf, int buf_len)
4860
4861{
4862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4863 struct ceph_osd_request *req;
4864 struct page **pages;
4865 int num_pages = calc_pages_for(0, buf_len);
4866 int ret;
4867
4868 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4869 if (!req)
4870 return -ENOMEM;
4871
4872 ceph_oid_copy(&req->r_base_oid, oid);
4873 ceph_oloc_copy(&req->r_base_oloc, oloc);
4874 req->r_flags = CEPH_OSD_FLAG_READ;
4875
4876 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4877 if (IS_ERR(pages)) {
4878 ret = PTR_ERR(pages);
4879 goto out_req;
4880 }
4881
4882 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4883 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4884 true);
4885
4886 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4887 if (ret)
4888 goto out_req;
4889
4890 ceph_osdc_start_request(osdc, req, false);
4891 ret = ceph_osdc_wait_request(osdc, req);
4892 if (ret >= 0)
4893 ceph_copy_from_page_vector(pages, buf, 0, ret);
4894
4895out_req:
4896 ceph_osdc_put_request(req);
4897 return ret;
4898}
4899
4900
4901
4902
4903
4904
4905static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4906{
4907 struct rbd_image_header_ondisk *ondisk = NULL;
4908 u32 snap_count = 0;
4909 u64 names_size = 0;
4910 u32 want_count;
4911 int ret;
4912
4913
4914
4915
4916
4917
4918
4919
4920 do {
4921 size_t size;
4922
4923 kfree(ondisk);
4924
4925 size = sizeof (*ondisk);
4926 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4927 size += names_size;
4928 ondisk = kmalloc(size, GFP_KERNEL);
4929 if (!ondisk)
4930 return -ENOMEM;
4931
4932 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4933 &rbd_dev->header_oloc, ondisk, size);
4934 if (ret < 0)
4935 goto out;
4936 if ((size_t)ret < size) {
4937 ret = -ENXIO;
4938 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4939 size, ret);
4940 goto out;
4941 }
4942 if (!rbd_dev_ondisk_valid(ondisk)) {
4943 ret = -ENXIO;
4944 rbd_warn(rbd_dev, "invalid header");
4945 goto out;
4946 }
4947
4948 names_size = le64_to_cpu(ondisk->snap_names_len);
4949 want_count = snap_count;
4950 snap_count = le32_to_cpu(ondisk->snap_count);
4951 } while (snap_count != want_count);
4952
4953 ret = rbd_header_from_disk(rbd_dev, ondisk);
4954out:
4955 kfree(ondisk);
4956
4957 return ret;
4958}
4959
4960static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4961{
4962 sector_t size;
4963
4964
4965
4966
4967
4968
4969 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4970 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4971 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4972 dout("setting size to %llu sectors", (unsigned long long)size);
4973 set_capacity(rbd_dev->disk, size);
4974 revalidate_disk_size(rbd_dev->disk, true);
4975 }
4976}
4977
4978static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4979{
4980 u64 mapping_size;
4981 int ret;
4982
4983 down_write(&rbd_dev->header_rwsem);
4984 mapping_size = rbd_dev->mapping.size;
4985
4986 ret = rbd_dev_header_info(rbd_dev);
4987 if (ret)
4988 goto out;
4989
4990
4991
4992
4993
4994 if (rbd_dev->parent) {
4995 ret = rbd_dev_v2_parent_info(rbd_dev);
4996 if (ret)
4997 goto out;
4998 }
4999
5000 rbd_assert(!rbd_is_snap(rbd_dev));
5001 rbd_dev->mapping.size = rbd_dev->header.image_size;
5002
5003out:
5004 up_write(&rbd_dev->header_rwsem);
5005 if (!ret && mapping_size != rbd_dev->mapping.size)
5006 rbd_dev_update_size(rbd_dev);
5007
5008 return ret;
5009}
5010
5011static const struct blk_mq_ops rbd_mq_ops = {
5012 .queue_rq = rbd_queue_rq,
5013};
5014
5015static int rbd_init_disk(struct rbd_device *rbd_dev)
5016{
5017 struct gendisk *disk;
5018 struct request_queue *q;
5019 unsigned int objset_bytes =
5020 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5021 int err;
5022
5023
5024 disk = alloc_disk(single_major ?
5025 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5026 RBD_MINORS_PER_MAJOR);
5027 if (!disk)
5028 return -ENOMEM;
5029
5030 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5031 rbd_dev->dev_id);
5032 disk->major = rbd_dev->major;
5033 disk->first_minor = rbd_dev->minor;
5034 if (single_major)
5035 disk->flags |= GENHD_FL_EXT_DEVT;
5036 disk->fops = &rbd_bd_ops;
5037 disk->private_data = rbd_dev;
5038
5039 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5040 rbd_dev->tag_set.ops = &rbd_mq_ops;
5041 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5042 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5043 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5044 rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
5045 rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
5046
5047 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5048 if (err)
5049 goto out_disk;
5050
5051 q = blk_mq_init_queue(&rbd_dev->tag_set);
5052 if (IS_ERR(q)) {
5053 err = PTR_ERR(q);
5054 goto out_tag_set;
5055 }
5056
5057 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5058
5059
5060 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5061 q->limits.max_sectors = queue_max_hw_sectors(q);
5062 blk_queue_max_segments(q, USHRT_MAX);
5063 blk_queue_max_segment_size(q, UINT_MAX);
5064 blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5065 blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5066
5067 if (rbd_dev->opts->trim) {
5068 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5069 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5070 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5071 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5072 }
5073
5074 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5075 blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
5076
5077
5078
5079
5080
5081 WARN_ON(!blk_get_queue(q));
5082 disk->queue = q;
5083 q->queuedata = rbd_dev;
5084
5085 rbd_dev->disk = disk;
5086
5087 return 0;
5088out_tag_set:
5089 blk_mq_free_tag_set(&rbd_dev->tag_set);
5090out_disk:
5091 put_disk(disk);
5092 return err;
5093}
5094
5095
5096
5097
5098
5099static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5100{
5101 return container_of(dev, struct rbd_device, dev);
5102}
5103
5104static ssize_t rbd_size_show(struct device *dev,
5105 struct device_attribute *attr, char *buf)
5106{
5107 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5108
5109 return sprintf(buf, "%llu\n",
5110 (unsigned long long)rbd_dev->mapping.size);
5111}
5112
5113static ssize_t rbd_features_show(struct device *dev,
5114 struct device_attribute *attr, char *buf)
5115{
5116 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5117
5118 return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5119}
5120
5121static ssize_t rbd_major_show(struct device *dev,
5122 struct device_attribute *attr, char *buf)
5123{
5124 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5125
5126 if (rbd_dev->major)
5127 return sprintf(buf, "%d\n", rbd_dev->major);
5128
5129 return sprintf(buf, "(none)\n");
5130}
5131
5132static ssize_t rbd_minor_show(struct device *dev,
5133 struct device_attribute *attr, char *buf)
5134{
5135 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5136
5137 return sprintf(buf, "%d\n", rbd_dev->minor);
5138}
5139
5140static ssize_t rbd_client_addr_show(struct device *dev,
5141 struct device_attribute *attr, char *buf)
5142{
5143 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5144 struct ceph_entity_addr *client_addr =
5145 ceph_client_addr(rbd_dev->rbd_client->client);
5146
5147 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5148 le32_to_cpu(client_addr->nonce));
5149}
5150
5151static ssize_t rbd_client_id_show(struct device *dev,
5152 struct device_attribute *attr, char *buf)
5153{
5154 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5155
5156 return sprintf(buf, "client%lld\n",
5157 ceph_client_gid(rbd_dev->rbd_client->client));
5158}
5159
5160static ssize_t rbd_cluster_fsid_show(struct device *dev,
5161 struct device_attribute *attr, char *buf)
5162{
5163 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5164
5165 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5166}
5167
5168static ssize_t rbd_config_info_show(struct device *dev,
5169 struct device_attribute *attr, char *buf)
5170{
5171 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5172
5173 if (!capable(CAP_SYS_ADMIN))
5174 return -EPERM;
5175
5176 return sprintf(buf, "%s\n", rbd_dev->config_info);
5177}
5178
5179static ssize_t rbd_pool_show(struct device *dev,
5180 struct device_attribute *attr, char *buf)
5181{
5182 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5183
5184 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5185}
5186
5187static ssize_t rbd_pool_id_show(struct device *dev,
5188 struct device_attribute *attr, char *buf)
5189{
5190 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5191
5192 return sprintf(buf, "%llu\n",
5193 (unsigned long long) rbd_dev->spec->pool_id);
5194}
5195
5196static ssize_t rbd_pool_ns_show(struct device *dev,
5197 struct device_attribute *attr, char *buf)
5198{
5199 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5200
5201 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5202}
5203
5204static ssize_t rbd_name_show(struct device *dev,
5205 struct device_attribute *attr, char *buf)
5206{
5207 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5208
5209 if (rbd_dev->spec->image_name)
5210 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5211
5212 return sprintf(buf, "(unknown)\n");
5213}
5214
5215static ssize_t rbd_image_id_show(struct device *dev,
5216 struct device_attribute *attr, char *buf)
5217{
5218 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5219
5220 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5221}
5222
5223
5224
5225
5226
5227static ssize_t rbd_snap_show(struct device *dev,
5228 struct device_attribute *attr,
5229 char *buf)
5230{
5231 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5232
5233 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5234}
5235
5236static ssize_t rbd_snap_id_show(struct device *dev,
5237 struct device_attribute *attr, char *buf)
5238{
5239 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5240
5241 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5242}
5243
5244
5245
5246
5247
5248
5249static ssize_t rbd_parent_show(struct device *dev,
5250 struct device_attribute *attr,
5251 char *buf)
5252{
5253 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5254 ssize_t count = 0;
5255
5256 if (!rbd_dev->parent)
5257 return sprintf(buf, "(no parent image)\n");
5258
5259 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5260 struct rbd_spec *spec = rbd_dev->parent_spec;
5261
5262 count += sprintf(&buf[count], "%s"
5263 "pool_id %llu\npool_name %s\n"
5264 "pool_ns %s\n"
5265 "image_id %s\nimage_name %s\n"
5266 "snap_id %llu\nsnap_name %s\n"
5267 "overlap %llu\n",
5268 !count ? "" : "\n",
5269 spec->pool_id, spec->pool_name,
5270 spec->pool_ns ?: "",
5271 spec->image_id, spec->image_name ?: "(unknown)",
5272 spec->snap_id, spec->snap_name,
5273 rbd_dev->parent_overlap);
5274 }
5275
5276 return count;
5277}
5278
5279static ssize_t rbd_image_refresh(struct device *dev,
5280 struct device_attribute *attr,
5281 const char *buf,
5282 size_t size)
5283{
5284 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5285 int ret;
5286
5287 if (!capable(CAP_SYS_ADMIN))
5288 return -EPERM;
5289
5290 ret = rbd_dev_refresh(rbd_dev);
5291 if (ret)
5292 return ret;
5293
5294 return size;
5295}
5296
5297static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5298static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5299static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5300static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5301static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5302static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5303static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5304static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5305static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5306static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5307static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5308static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5309static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5310static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5311static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5312static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5313static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5314
5315static struct attribute *rbd_attrs[] = {
5316 &dev_attr_size.attr,
5317 &dev_attr_features.attr,
5318 &dev_attr_major.attr,
5319 &dev_attr_minor.attr,
5320 &dev_attr_client_addr.attr,
5321 &dev_attr_client_id.attr,
5322 &dev_attr_cluster_fsid.attr,
5323 &dev_attr_config_info.attr,
5324 &dev_attr_pool.attr,
5325 &dev_attr_pool_id.attr,
5326 &dev_attr_pool_ns.attr,
5327 &dev_attr_name.attr,
5328 &dev_attr_image_id.attr,
5329 &dev_attr_current_snap.attr,
5330 &dev_attr_snap_id.attr,
5331 &dev_attr_parent.attr,
5332 &dev_attr_refresh.attr,
5333 NULL
5334};
5335
5336static struct attribute_group rbd_attr_group = {
5337 .attrs = rbd_attrs,
5338};
5339
5340static const struct attribute_group *rbd_attr_groups[] = {
5341 &rbd_attr_group,
5342 NULL
5343};
5344
5345static void rbd_dev_release(struct device *dev);
5346
5347static const struct device_type rbd_device_type = {
5348 .name = "rbd",
5349 .groups = rbd_attr_groups,
5350 .release = rbd_dev_release,
5351};
5352
5353static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5354{
5355 kref_get(&spec->kref);
5356
5357 return spec;
5358}
5359
5360static void rbd_spec_free(struct kref *kref);
5361static void rbd_spec_put(struct rbd_spec *spec)
5362{
5363 if (spec)
5364 kref_put(&spec->kref, rbd_spec_free);
5365}
5366
5367static struct rbd_spec *rbd_spec_alloc(void)
5368{
5369 struct rbd_spec *spec;
5370
5371 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5372 if (!spec)
5373 return NULL;
5374
5375 spec->pool_id = CEPH_NOPOOL;
5376 spec->snap_id = CEPH_NOSNAP;
5377 kref_init(&spec->kref);
5378
5379 return spec;
5380}
5381
5382static void rbd_spec_free(struct kref *kref)
5383{
5384 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5385
5386 kfree(spec->pool_name);
5387 kfree(spec->pool_ns);
5388 kfree(spec->image_id);
5389 kfree(spec->image_name);
5390 kfree(spec->snap_name);
5391 kfree(spec);
5392}
5393
5394static void rbd_dev_free(struct rbd_device *rbd_dev)
5395{
5396 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5397 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5398
5399 ceph_oid_destroy(&rbd_dev->header_oid);
5400 ceph_oloc_destroy(&rbd_dev->header_oloc);
5401 kfree(rbd_dev->config_info);
5402
5403 rbd_put_client(rbd_dev->rbd_client);
5404 rbd_spec_put(rbd_dev->spec);
5405 kfree(rbd_dev->opts);
5406 kfree(rbd_dev);
5407}
5408
5409static void rbd_dev_release(struct device *dev)
5410{
5411 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5412 bool need_put = !!rbd_dev->opts;
5413
5414 if (need_put) {
5415 destroy_workqueue(rbd_dev->task_wq);
5416 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5417 }
5418
5419 rbd_dev_free(rbd_dev);
5420
5421
5422
5423
5424
5425
5426 if (need_put)
5427 module_put(THIS_MODULE);
5428}
5429
5430static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5431 struct rbd_spec *spec)
5432{
5433 struct rbd_device *rbd_dev;
5434
5435 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5436 if (!rbd_dev)
5437 return NULL;
5438
5439 spin_lock_init(&rbd_dev->lock);
5440 INIT_LIST_HEAD(&rbd_dev->node);
5441 init_rwsem(&rbd_dev->header_rwsem);
5442
5443 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5444 ceph_oid_init(&rbd_dev->header_oid);
5445 rbd_dev->header_oloc.pool = spec->pool_id;
5446 if (spec->pool_ns) {
5447 WARN_ON(!*spec->pool_ns);
5448 rbd_dev->header_oloc.pool_ns =
5449 ceph_find_or_create_string(spec->pool_ns,
5450 strlen(spec->pool_ns));
5451 }
5452
5453 mutex_init(&rbd_dev->watch_mutex);
5454 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5455 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5456
5457 init_rwsem(&rbd_dev->lock_rwsem);
5458 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5459 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5460 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5461 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5462 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5463 spin_lock_init(&rbd_dev->lock_lists_lock);
5464 INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5465 INIT_LIST_HEAD(&rbd_dev->running_list);
5466 init_completion(&rbd_dev->acquire_wait);
5467 init_completion(&rbd_dev->releasing_wait);
5468
5469 spin_lock_init(&rbd_dev->object_map_lock);
5470
5471 rbd_dev->dev.bus = &rbd_bus_type;
5472 rbd_dev->dev.type = &rbd_device_type;
5473 rbd_dev->dev.parent = &rbd_root_dev;
5474 device_initialize(&rbd_dev->dev);
5475
5476 rbd_dev->rbd_client = rbdc;
5477 rbd_dev->spec = spec;
5478
5479 return rbd_dev;
5480}
5481
5482
5483
5484
5485static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5486 struct rbd_spec *spec,
5487 struct rbd_options *opts)
5488{
5489 struct rbd_device *rbd_dev;
5490
5491 rbd_dev = __rbd_dev_create(rbdc, spec);
5492 if (!rbd_dev)
5493 return NULL;
5494
5495 rbd_dev->opts = opts;
5496
5497
5498 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5499 minor_to_rbd_dev_id(1 << MINORBITS),
5500 GFP_KERNEL);
5501 if (rbd_dev->dev_id < 0)
5502 goto fail_rbd_dev;
5503
5504 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5505 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5506 rbd_dev->name);
5507 if (!rbd_dev->task_wq)
5508 goto fail_dev_id;
5509
5510
5511 __module_get(THIS_MODULE);
5512
5513 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5514 return rbd_dev;
5515
5516fail_dev_id:
5517 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5518fail_rbd_dev:
5519 rbd_dev_free(rbd_dev);
5520 return NULL;
5521}
5522
5523static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5524{
5525 if (rbd_dev)
5526 put_device(&rbd_dev->dev);
5527}
5528
5529
5530
5531
5532
5533
5534static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5535 u8 *order, u64 *snap_size)
5536{
5537 __le64 snapid = cpu_to_le64(snap_id);
5538 int ret;
5539 struct {
5540 u8 order;
5541 __le64 size;
5542 } __attribute__ ((packed)) size_buf = { 0 };
5543
5544 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5545 &rbd_dev->header_oloc, "get_size",
5546 &snapid, sizeof(snapid),
5547 &size_buf, sizeof(size_buf));
5548 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5549 if (ret < 0)
5550 return ret;
5551 if (ret < sizeof (size_buf))
5552 return -ERANGE;
5553
5554 if (order) {
5555 *order = size_buf.order;
5556 dout(" order %u", (unsigned int)*order);
5557 }
5558 *snap_size = le64_to_cpu(size_buf.size);
5559
5560 dout(" snap_id 0x%016llx snap_size = %llu\n",
5561 (unsigned long long)snap_id,
5562 (unsigned long long)*snap_size);
5563
5564 return 0;
5565}
5566
5567static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5568{
5569 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5570 &rbd_dev->header.obj_order,
5571 &rbd_dev->header.image_size);
5572}
5573
5574static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5575{
5576 size_t size;
5577 void *reply_buf;
5578 int ret;
5579 void *p;
5580
5581
5582 size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5583 reply_buf = kzalloc(size, GFP_KERNEL);
5584 if (!reply_buf)
5585 return -ENOMEM;
5586
5587 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5588 &rbd_dev->header_oloc, "get_object_prefix",
5589 NULL, 0, reply_buf, size);
5590 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5591 if (ret < 0)
5592 goto out;
5593
5594 p = reply_buf;
5595 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5596 p + ret, NULL, GFP_NOIO);
5597 ret = 0;
5598
5599 if (IS_ERR(rbd_dev->header.object_prefix)) {
5600 ret = PTR_ERR(rbd_dev->header.object_prefix);
5601 rbd_dev->header.object_prefix = NULL;
5602 } else {
5603 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5604 }
5605out:
5606 kfree(reply_buf);
5607
5608 return ret;
5609}
5610
5611static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5612 bool read_only, u64 *snap_features)
5613{
5614 struct {
5615 __le64 snap_id;
5616 u8 read_only;
5617 } features_in;
5618 struct {
5619 __le64 features;
5620 __le64 incompat;
5621 } __attribute__ ((packed)) features_buf = { 0 };
5622 u64 unsup;
5623 int ret;
5624
5625 features_in.snap_id = cpu_to_le64(snap_id);
5626 features_in.read_only = read_only;
5627
5628 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5629 &rbd_dev->header_oloc, "get_features",
5630 &features_in, sizeof(features_in),
5631 &features_buf, sizeof(features_buf));
5632 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5633 if (ret < 0)
5634 return ret;
5635 if (ret < sizeof (features_buf))
5636 return -ERANGE;
5637
5638 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5639 if (unsup) {
5640 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5641 unsup);
5642 return -ENXIO;
5643 }
5644
5645 *snap_features = le64_to_cpu(features_buf.features);
5646
5647 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5648 (unsigned long long)snap_id,
5649 (unsigned long long)*snap_features,
5650 (unsigned long long)le64_to_cpu(features_buf.incompat));
5651
5652 return 0;
5653}
5654
5655static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5656{
5657 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5658 rbd_is_ro(rbd_dev),
5659 &rbd_dev->header.features);
5660}
5661
5662
5663
5664
5665
5666
5667
5668
5669static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5670{
5671 __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5672 __le64 flags;
5673 int ret;
5674
5675 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5676 &rbd_dev->header_oloc, "get_flags",
5677 &snapid, sizeof(snapid),
5678 &flags, sizeof(flags));
5679 if (ret < 0)
5680 return ret;
5681 if (ret < sizeof(flags))
5682 return -EBADMSG;
5683
5684 rbd_dev->object_map_flags = le64_to_cpu(flags);
5685 return 0;
5686}
5687
5688struct parent_image_info {
5689 u64 pool_id;
5690 const char *pool_ns;
5691 const char *image_id;
5692 u64 snap_id;
5693
5694 bool has_overlap;
5695 u64 overlap;
5696};
5697
5698
5699
5700
5701static int decode_parent_image_spec(void **p, void *end,
5702 struct parent_image_info *pii)
5703{
5704 u8 struct_v;
5705 u32 struct_len;
5706 int ret;
5707
5708 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5709 &struct_v, &struct_len);
5710 if (ret)
5711 return ret;
5712
5713 ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5714 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5715 if (IS_ERR(pii->pool_ns)) {
5716 ret = PTR_ERR(pii->pool_ns);
5717 pii->pool_ns = NULL;
5718 return ret;
5719 }
5720 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5721 if (IS_ERR(pii->image_id)) {
5722 ret = PTR_ERR(pii->image_id);
5723 pii->image_id = NULL;
5724 return ret;
5725 }
5726 ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5727 return 0;
5728
5729e_inval:
5730 return -EINVAL;
5731}
5732
5733static int __get_parent_info(struct rbd_device *rbd_dev,
5734 struct page *req_page,
5735 struct page *reply_page,
5736 struct parent_image_info *pii)
5737{
5738 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5739 size_t reply_len = PAGE_SIZE;
5740 void *p, *end;
5741 int ret;
5742
5743 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5744 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5745 req_page, sizeof(u64), &reply_page, &reply_len);
5746 if (ret)
5747 return ret == -EOPNOTSUPP ? 1 : ret;
5748
5749 p = page_address(reply_page);
5750 end = p + reply_len;
5751 ret = decode_parent_image_spec(&p, end, pii);
5752 if (ret)
5753 return ret;
5754
5755 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5756 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5757 req_page, sizeof(u64), &reply_page, &reply_len);
5758 if (ret)
5759 return ret;
5760
5761 p = page_address(reply_page);
5762 end = p + reply_len;
5763 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5764 if (pii->has_overlap)
5765 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5766
5767 return 0;
5768
5769e_inval:
5770 return -EINVAL;
5771}
5772
5773
5774
5775
5776static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5777 struct page *req_page,
5778 struct page *reply_page,
5779 struct parent_image_info *pii)
5780{
5781 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5782 size_t reply_len = PAGE_SIZE;
5783 void *p, *end;
5784 int ret;
5785
5786 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5787 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5788 req_page, sizeof(u64), &reply_page, &reply_len);
5789 if (ret)
5790 return ret;
5791
5792 p = page_address(reply_page);
5793 end = p + reply_len;
5794 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5795 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5796 if (IS_ERR(pii->image_id)) {
5797 ret = PTR_ERR(pii->image_id);
5798 pii->image_id = NULL;
5799 return ret;
5800 }
5801 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5802 pii->has_overlap = true;
5803 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5804
5805 return 0;
5806
5807e_inval:
5808 return -EINVAL;
5809}
5810
5811static int get_parent_info(struct rbd_device *rbd_dev,
5812 struct parent_image_info *pii)
5813{
5814 struct page *req_page, *reply_page;
5815 void *p;
5816 int ret;
5817
5818 req_page = alloc_page(GFP_KERNEL);
5819 if (!req_page)
5820 return -ENOMEM;
5821
5822 reply_page = alloc_page(GFP_KERNEL);
5823 if (!reply_page) {
5824 __free_page(req_page);
5825 return -ENOMEM;
5826 }
5827
5828 p = page_address(req_page);
5829 ceph_encode_64(&p, rbd_dev->spec->snap_id);
5830 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5831 if (ret > 0)
5832 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5833 pii);
5834
5835 __free_page(req_page);
5836 __free_page(reply_page);
5837 return ret;
5838}
5839
5840static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5841{
5842 struct rbd_spec *parent_spec;
5843 struct parent_image_info pii = { 0 };
5844 int ret;
5845
5846 parent_spec = rbd_spec_alloc();
5847 if (!parent_spec)
5848 return -ENOMEM;
5849
5850 ret = get_parent_info(rbd_dev, &pii);
5851 if (ret)
5852 goto out_err;
5853
5854 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5855 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5856 pii.has_overlap, pii.overlap);
5857
5858 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5859
5860
5861
5862
5863
5864
5865
5866
5867
5868
5869
5870
5871
5872 if (rbd_dev->parent_overlap) {
5873 rbd_dev->parent_overlap = 0;
5874 rbd_dev_parent_put(rbd_dev);
5875 pr_info("%s: clone image has been flattened\n",
5876 rbd_dev->disk->disk_name);
5877 }
5878
5879 goto out;
5880 }
5881
5882
5883
5884 ret = -EIO;
5885 if (pii.pool_id > (u64)U32_MAX) {
5886 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5887 (unsigned long long)pii.pool_id, U32_MAX);
5888 goto out_err;
5889 }
5890
5891
5892
5893
5894
5895
5896 if (!rbd_dev->parent_spec) {
5897 parent_spec->pool_id = pii.pool_id;
5898 if (pii.pool_ns && *pii.pool_ns) {
5899 parent_spec->pool_ns = pii.pool_ns;
5900 pii.pool_ns = NULL;
5901 }
5902 parent_spec->image_id = pii.image_id;
5903 pii.image_id = NULL;
5904 parent_spec->snap_id = pii.snap_id;
5905
5906 rbd_dev->parent_spec = parent_spec;
5907 parent_spec = NULL;
5908 }
5909
5910
5911
5912
5913
5914 if (!pii.overlap) {
5915 if (parent_spec) {
5916
5917 if (rbd_dev->parent_overlap)
5918 rbd_warn(rbd_dev,
5919 "clone now standalone (overlap became 0)");
5920 } else {
5921
5922 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5923 }
5924 }
5925 rbd_dev->parent_overlap = pii.overlap;
5926
5927out:
5928 ret = 0;
5929out_err:
5930 kfree(pii.pool_ns);
5931 kfree(pii.image_id);
5932 rbd_spec_put(parent_spec);
5933 return ret;
5934}
5935
5936static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5937{
5938 struct {
5939 __le64 stripe_unit;
5940 __le64 stripe_count;
5941 } __attribute__ ((packed)) striping_info_buf = { 0 };
5942 size_t size = sizeof (striping_info_buf);
5943 void *p;
5944 int ret;
5945
5946 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5947 &rbd_dev->header_oloc, "get_stripe_unit_count",
5948 NULL, 0, &striping_info_buf, size);
5949 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5950 if (ret < 0)
5951 return ret;
5952 if (ret < size)
5953 return -ERANGE;
5954
5955 p = &striping_info_buf;
5956 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5957 rbd_dev->header.stripe_count = ceph_decode_64(&p);
5958 return 0;
5959}
5960
5961static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5962{
5963 __le64 data_pool_id;
5964 int ret;
5965
5966 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5967 &rbd_dev->header_oloc, "get_data_pool",
5968 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5969 if (ret < 0)
5970 return ret;
5971 if (ret < sizeof(data_pool_id))
5972 return -EBADMSG;
5973
5974 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5975 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5976 return 0;
5977}
5978
5979static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5980{
5981 CEPH_DEFINE_OID_ONSTACK(oid);
5982 size_t image_id_size;
5983 char *image_id;
5984 void *p;
5985 void *end;
5986 size_t size;
5987 void *reply_buf = NULL;
5988 size_t len = 0;
5989 char *image_name = NULL;
5990 int ret;
5991
5992 rbd_assert(!rbd_dev->spec->image_name);
5993
5994 len = strlen(rbd_dev->spec->image_id);
5995 image_id_size = sizeof (__le32) + len;
5996 image_id = kmalloc(image_id_size, GFP_KERNEL);
5997 if (!image_id)
5998 return NULL;
5999
6000 p = image_id;
6001 end = image_id + image_id_size;
6002 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6003
6004 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6005 reply_buf = kmalloc(size, GFP_KERNEL);
6006 if (!reply_buf)
6007 goto out;
6008
6009 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6010 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6011 "dir_get_name", image_id, image_id_size,
6012 reply_buf, size);
6013 if (ret < 0)
6014 goto out;
6015 p = reply_buf;
6016 end = reply_buf + ret;
6017
6018 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6019 if (IS_ERR(image_name))
6020 image_name = NULL;
6021 else
6022 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6023out:
6024 kfree(reply_buf);
6025 kfree(image_id);
6026
6027 return image_name;
6028}
6029
6030static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6031{
6032 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6033 const char *snap_name;
6034 u32 which = 0;
6035
6036
6037
6038 snap_name = rbd_dev->header.snap_names;
6039 while (which < snapc->num_snaps) {
6040 if (!strcmp(name, snap_name))
6041 return snapc->snaps[which];
6042 snap_name += strlen(snap_name) + 1;
6043 which++;
6044 }
6045 return CEPH_NOSNAP;
6046}
6047
6048static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6049{
6050 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6051 u32 which;
6052 bool found = false;
6053 u64 snap_id;
6054
6055 for (which = 0; !found && which < snapc->num_snaps; which++) {
6056 const char *snap_name;
6057
6058 snap_id = snapc->snaps[which];
6059 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6060 if (IS_ERR(snap_name)) {
6061
6062 if (PTR_ERR(snap_name) == -ENOENT)
6063 continue;
6064 else
6065 break;
6066 }
6067 found = !strcmp(name, snap_name);
6068 kfree(snap_name);
6069 }
6070 return found ? snap_id : CEPH_NOSNAP;
6071}
6072
6073
6074
6075
6076
6077static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6078{
6079 if (rbd_dev->image_format == 1)
6080 return rbd_v1_snap_id_by_name(rbd_dev, name);
6081
6082 return rbd_v2_snap_id_by_name(rbd_dev, name);
6083}
6084
6085
6086
6087
6088static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6089{
6090 struct rbd_spec *spec = rbd_dev->spec;
6091
6092 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6093 rbd_assert(spec->image_id && spec->image_name);
6094 rbd_assert(spec->snap_name);
6095
6096 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6097 u64 snap_id;
6098
6099 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6100 if (snap_id == CEPH_NOSNAP)
6101 return -ENOENT;
6102
6103 spec->snap_id = snap_id;
6104 } else {
6105 spec->snap_id = CEPH_NOSNAP;
6106 }
6107
6108 return 0;
6109}
6110
6111
6112
6113
6114
6115
6116
6117static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6118{
6119 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6120 struct rbd_spec *spec = rbd_dev->spec;
6121 const char *pool_name;
6122 const char *image_name;
6123 const char *snap_name;
6124 int ret;
6125
6126 rbd_assert(spec->pool_id != CEPH_NOPOOL);
6127 rbd_assert(spec->image_id);
6128 rbd_assert(spec->snap_id != CEPH_NOSNAP);
6129
6130
6131
6132 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6133 if (!pool_name) {
6134 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6135 return -EIO;
6136 }
6137 pool_name = kstrdup(pool_name, GFP_KERNEL);
6138 if (!pool_name)
6139 return -ENOMEM;
6140
6141
6142
6143 image_name = rbd_dev_image_name(rbd_dev);
6144 if (!image_name)
6145 rbd_warn(rbd_dev, "unable to get image name");
6146
6147
6148
6149 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6150 if (IS_ERR(snap_name)) {
6151 ret = PTR_ERR(snap_name);
6152 goto out_err;
6153 }
6154
6155 spec->pool_name = pool_name;
6156 spec->image_name = image_name;
6157 spec->snap_name = snap_name;
6158
6159 return 0;
6160
6161out_err:
6162 kfree(image_name);
6163 kfree(pool_name);
6164 return ret;
6165}
6166
6167static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6168{
6169 size_t size;
6170 int ret;
6171 void *reply_buf;
6172 void *p;
6173 void *end;
6174 u64 seq;
6175 u32 snap_count;
6176 struct ceph_snap_context *snapc;
6177 u32 i;
6178
6179
6180
6181
6182
6183
6184
6185 size = sizeof (__le64) + sizeof (__le32) +
6186 RBD_MAX_SNAP_COUNT * sizeof (__le64);
6187 reply_buf = kzalloc(size, GFP_KERNEL);
6188 if (!reply_buf)
6189 return -ENOMEM;
6190
6191 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6192 &rbd_dev->header_oloc, "get_snapcontext",
6193 NULL, 0, reply_buf, size);
6194 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6195 if (ret < 0)
6196 goto out;
6197
6198 p = reply_buf;
6199 end = reply_buf + ret;
6200 ret = -ERANGE;
6201 ceph_decode_64_safe(&p, end, seq, out);
6202 ceph_decode_32_safe(&p, end, snap_count, out);
6203
6204
6205
6206
6207
6208
6209
6210 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6211 / sizeof (u64)) {
6212 ret = -EINVAL;
6213 goto out;
6214 }
6215 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6216 goto out;
6217 ret = 0;
6218
6219 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6220 if (!snapc) {
6221 ret = -ENOMEM;
6222 goto out;
6223 }
6224 snapc->seq = seq;
6225 for (i = 0; i < snap_count; i++)
6226 snapc->snaps[i] = ceph_decode_64(&p);
6227
6228 ceph_put_snap_context(rbd_dev->header.snapc);
6229 rbd_dev->header.snapc = snapc;
6230
6231 dout(" snap context seq = %llu, snap_count = %u\n",
6232 (unsigned long long)seq, (unsigned int)snap_count);
6233out:
6234 kfree(reply_buf);
6235
6236 return ret;
6237}
6238
6239static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6240 u64 snap_id)
6241{
6242 size_t size;
6243 void *reply_buf;
6244 __le64 snapid;
6245 int ret;
6246 void *p;
6247 void *end;
6248 char *snap_name;
6249
6250 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6251 reply_buf = kmalloc(size, GFP_KERNEL);
6252 if (!reply_buf)
6253 return ERR_PTR(-ENOMEM);
6254
6255 snapid = cpu_to_le64(snap_id);
6256 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6257 &rbd_dev->header_oloc, "get_snapshot_name",
6258 &snapid, sizeof(snapid), reply_buf, size);
6259 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6260 if (ret < 0) {
6261 snap_name = ERR_PTR(ret);
6262 goto out;
6263 }
6264
6265 p = reply_buf;
6266 end = reply_buf + ret;
6267 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6268 if (IS_ERR(snap_name))
6269 goto out;
6270
6271 dout(" snap_id 0x%016llx snap_name = %s\n",
6272 (unsigned long long)snap_id, snap_name);
6273out:
6274 kfree(reply_buf);
6275
6276 return snap_name;
6277}
6278
6279static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6280{
6281 bool first_time = rbd_dev->header.object_prefix == NULL;
6282 int ret;
6283
6284 ret = rbd_dev_v2_image_size(rbd_dev);
6285 if (ret)
6286 return ret;
6287
6288 if (first_time) {
6289 ret = rbd_dev_v2_header_onetime(rbd_dev);
6290 if (ret)
6291 return ret;
6292 }
6293
6294 ret = rbd_dev_v2_snap_context(rbd_dev);
6295 if (ret && first_time) {
6296 kfree(rbd_dev->header.object_prefix);
6297 rbd_dev->header.object_prefix = NULL;
6298 }
6299
6300 return ret;
6301}
6302
6303static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6304{
6305 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6306
6307 if (rbd_dev->image_format == 1)
6308 return rbd_dev_v1_header_info(rbd_dev);
6309
6310 return rbd_dev_v2_header_info(rbd_dev);
6311}
6312
6313
6314
6315
6316
6317
6318
6319static inline size_t next_token(const char **buf)
6320{
6321
6322
6323
6324
6325 const char *spaces = " \f\n\r\t\v";
6326
6327 *buf += strspn(*buf, spaces);
6328
6329 return strcspn(*buf, spaces);
6330}
6331
6332
6333
6334
6335
6336
6337
6338
6339
6340
6341
6342
6343
6344
6345
6346
6347
6348static inline char *dup_token(const char **buf, size_t *lenp)
6349{
6350 char *dup;
6351 size_t len;
6352
6353 len = next_token(buf);
6354 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6355 if (!dup)
6356 return NULL;
6357 *(dup + len) = '\0';
6358 *buf += len;
6359
6360 if (lenp)
6361 *lenp = len;
6362
6363 return dup;
6364}
6365
6366
6367
6368
6369
6370
6371
6372
6373
6374
6375
6376
6377
6378
6379
6380
6381
6382
6383
6384
6385
6386
6387
6388
6389
6390
6391
6392
6393
6394
6395
6396
6397
6398
6399
6400
6401
6402
6403
6404
6405
6406
6407static int rbd_add_parse_args(const char *buf,
6408 struct ceph_options **ceph_opts,
6409 struct rbd_options **opts,
6410 struct rbd_spec **rbd_spec)
6411{
6412 size_t len;
6413 char *options;
6414 const char *mon_addrs;
6415 char *snap_name;
6416 size_t mon_addrs_size;
6417 struct parse_rbd_opts_ctx pctx = { 0 };
6418 struct ceph_options *copts;
6419 int ret;
6420
6421
6422
6423 len = next_token(&buf);
6424 if (!len) {
6425 rbd_warn(NULL, "no monitor address(es) provided");
6426 return -EINVAL;
6427 }
6428 mon_addrs = buf;
6429 mon_addrs_size = len + 1;
6430 buf += len;
6431
6432 ret = -EINVAL;
6433 options = dup_token(&buf, NULL);
6434 if (!options)
6435 return -ENOMEM;
6436 if (!*options) {
6437 rbd_warn(NULL, "no options provided");
6438 goto out_err;
6439 }
6440
6441 pctx.spec = rbd_spec_alloc();
6442 if (!pctx.spec)
6443 goto out_mem;
6444
6445 pctx.spec->pool_name = dup_token(&buf, NULL);
6446 if (!pctx.spec->pool_name)
6447 goto out_mem;
6448 if (!*pctx.spec->pool_name) {
6449 rbd_warn(NULL, "no pool name provided");
6450 goto out_err;
6451 }
6452
6453 pctx.spec->image_name = dup_token(&buf, NULL);
6454 if (!pctx.spec->image_name)
6455 goto out_mem;
6456 if (!*pctx.spec->image_name) {
6457 rbd_warn(NULL, "no image name provided");
6458 goto out_err;
6459 }
6460
6461
6462
6463
6464
6465 len = next_token(&buf);
6466 if (!len) {
6467 buf = RBD_SNAP_HEAD_NAME;
6468 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6469 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6470 ret = -ENAMETOOLONG;
6471 goto out_err;
6472 }
6473 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6474 if (!snap_name)
6475 goto out_mem;
6476 *(snap_name + len) = '\0';
6477 pctx.spec->snap_name = snap_name;
6478
6479
6480
6481 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6482 if (!pctx.opts)
6483 goto out_mem;
6484
6485 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6486 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6487 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6488 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6489 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6490 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6491 pctx.opts->trim = RBD_TRIM_DEFAULT;
6492
6493 copts = ceph_parse_options(options, mon_addrs,
6494 mon_addrs + mon_addrs_size - 1,
6495 parse_rbd_opts_token, &pctx);
6496 if (IS_ERR(copts)) {
6497 ret = PTR_ERR(copts);
6498 goto out_err;
6499 }
6500 kfree(options);
6501
6502 *ceph_opts = copts;
6503 *opts = pctx.opts;
6504 *rbd_spec = pctx.spec;
6505
6506 return 0;
6507out_mem:
6508 ret = -ENOMEM;
6509out_err:
6510 kfree(pctx.opts);
6511 rbd_spec_put(pctx.spec);
6512 kfree(options);
6513
6514 return ret;
6515}
6516
6517static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6518{
6519 down_write(&rbd_dev->lock_rwsem);
6520 if (__rbd_is_lock_owner(rbd_dev))
6521 __rbd_release_lock(rbd_dev);
6522 up_write(&rbd_dev->lock_rwsem);
6523}
6524
6525
6526
6527
6528
6529
6530static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6531{
6532 long ret;
6533
6534 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6535 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6536 return 0;
6537
6538 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6539 return -EINVAL;
6540 }
6541
6542 if (rbd_is_ro(rbd_dev))
6543 return 0;
6544
6545 rbd_assert(!rbd_is_lock_owner(rbd_dev));
6546 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6547 ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6548 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6549 if (ret > 0) {
6550 ret = rbd_dev->acquire_err;
6551 } else {
6552 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6553 if (!ret)
6554 ret = -ETIMEDOUT;
6555 }
6556
6557 if (ret) {
6558 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6559 return ret;
6560 }
6561
6562
6563
6564
6565
6566 rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6567 return 0;
6568}
6569
6570
6571
6572
6573
6574
6575
6576
6577
6578
6579
6580
6581
6582
6583
6584static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6585{
6586 int ret;
6587 size_t size;
6588 CEPH_DEFINE_OID_ONSTACK(oid);
6589 void *response;
6590 char *image_id;
6591
6592
6593
6594
6595
6596
6597
6598 if (rbd_dev->spec->image_id) {
6599 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6600
6601 return 0;
6602 }
6603
6604
6605
6606
6607
6608 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6609 rbd_dev->spec->image_name);
6610 if (ret)
6611 return ret;
6612
6613 dout("rbd id object name is %s\n", oid.name);
6614
6615
6616 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6617 response = kzalloc(size, GFP_NOIO);
6618 if (!response) {
6619 ret = -ENOMEM;
6620 goto out;
6621 }
6622
6623
6624
6625 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6626 "get_id", NULL, 0,
6627 response, size);
6628 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6629 if (ret == -ENOENT) {
6630 image_id = kstrdup("", GFP_KERNEL);
6631 ret = image_id ? 0 : -ENOMEM;
6632 if (!ret)
6633 rbd_dev->image_format = 1;
6634 } else if (ret >= 0) {
6635 void *p = response;
6636
6637 image_id = ceph_extract_encoded_string(&p, p + ret,
6638 NULL, GFP_NOIO);
6639 ret = PTR_ERR_OR_ZERO(image_id);
6640 if (!ret)
6641 rbd_dev->image_format = 2;
6642 }
6643
6644 if (!ret) {
6645 rbd_dev->spec->image_id = image_id;
6646 dout("image_id is %s\n", image_id);
6647 }
6648out:
6649 kfree(response);
6650 ceph_oid_destroy(&oid);
6651 return ret;
6652}
6653
6654
6655
6656
6657
6658static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6659{
6660 struct rbd_image_header *header;
6661
6662 rbd_dev_parent_put(rbd_dev);
6663 rbd_object_map_free(rbd_dev);
6664 rbd_dev_mapping_clear(rbd_dev);
6665
6666
6667
6668 header = &rbd_dev->header;
6669 ceph_put_snap_context(header->snapc);
6670 kfree(header->snap_sizes);
6671 kfree(header->snap_names);
6672 kfree(header->object_prefix);
6673 memset(header, 0, sizeof (*header));
6674}
6675
6676static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6677{
6678 int ret;
6679
6680 ret = rbd_dev_v2_object_prefix(rbd_dev);
6681 if (ret)
6682 goto out_err;
6683
6684
6685
6686
6687
6688 ret = rbd_dev_v2_features(rbd_dev);
6689 if (ret)
6690 goto out_err;
6691
6692
6693
6694 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6695 ret = rbd_dev_v2_striping_info(rbd_dev);
6696 if (ret < 0)
6697 goto out_err;
6698 }
6699
6700 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6701 ret = rbd_dev_v2_data_pool(rbd_dev);
6702 if (ret)
6703 goto out_err;
6704 }
6705
6706 rbd_init_layout(rbd_dev);
6707 return 0;
6708
6709out_err:
6710 rbd_dev->header.features = 0;
6711 kfree(rbd_dev->header.object_prefix);
6712 rbd_dev->header.object_prefix = NULL;
6713 return ret;
6714}
6715
6716
6717
6718
6719
6720
6721static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6722{
6723 struct rbd_device *parent = NULL;
6724 int ret;
6725
6726 if (!rbd_dev->parent_spec)
6727 return 0;
6728
6729 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6730 pr_info("parent chain is too long (%d)\n", depth);
6731 ret = -EINVAL;
6732 goto out_err;
6733 }
6734
6735 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6736 if (!parent) {
6737 ret = -ENOMEM;
6738 goto out_err;
6739 }
6740
6741
6742
6743
6744
6745 __rbd_get_client(rbd_dev->rbd_client);
6746 rbd_spec_get(rbd_dev->parent_spec);
6747
6748 __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6749
6750 ret = rbd_dev_image_probe(parent, depth);
6751 if (ret < 0)
6752 goto out_err;
6753
6754 rbd_dev->parent = parent;
6755 atomic_set(&rbd_dev->parent_ref, 1);
6756 return 0;
6757
6758out_err:
6759 rbd_dev_unparent(rbd_dev);
6760 rbd_dev_destroy(parent);
6761 return ret;
6762}
6763
6764static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6765{
6766 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6767 rbd_free_disk(rbd_dev);
6768 if (!single_major)
6769 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6770}
6771
6772
6773
6774
6775
6776static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6777{
6778 int ret;
6779
6780
6781
6782 if (!single_major) {
6783 ret = register_blkdev(0, rbd_dev->name);
6784 if (ret < 0)
6785 goto err_out_unlock;
6786
6787 rbd_dev->major = ret;
6788 rbd_dev->minor = 0;
6789 } else {
6790 rbd_dev->major = rbd_major;
6791 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6792 }
6793
6794
6795
6796 ret = rbd_init_disk(rbd_dev);
6797 if (ret)
6798 goto err_out_blkdev;
6799
6800 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6801 set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6802
6803 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6804 if (ret)
6805 goto err_out_disk;
6806
6807 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6808 up_write(&rbd_dev->header_rwsem);
6809 return 0;
6810
6811err_out_disk:
6812 rbd_free_disk(rbd_dev);
6813err_out_blkdev:
6814 if (!single_major)
6815 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6816err_out_unlock:
6817 up_write(&rbd_dev->header_rwsem);
6818 return ret;
6819}
6820
6821static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6822{
6823 struct rbd_spec *spec = rbd_dev->spec;
6824 int ret;
6825
6826
6827
6828 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6829 if (rbd_dev->image_format == 1)
6830 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6831 spec->image_name, RBD_SUFFIX);
6832 else
6833 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6834 RBD_HEADER_PREFIX, spec->image_id);
6835
6836 return ret;
6837}
6838
6839static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6840{
6841 if (!is_snap) {
6842 pr_info("image %s/%s%s%s does not exist\n",
6843 rbd_dev->spec->pool_name,
6844 rbd_dev->spec->pool_ns ?: "",
6845 rbd_dev->spec->pool_ns ? "/" : "",
6846 rbd_dev->spec->image_name);
6847 } else {
6848 pr_info("snap %s/%s%s%s@%s does not exist\n",
6849 rbd_dev->spec->pool_name,
6850 rbd_dev->spec->pool_ns ?: "",
6851 rbd_dev->spec->pool_ns ? "/" : "",
6852 rbd_dev->spec->image_name,
6853 rbd_dev->spec->snap_name);
6854 }
6855}
6856
6857static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6858{
6859 if (!rbd_is_ro(rbd_dev))
6860 rbd_unregister_watch(rbd_dev);
6861
6862 rbd_dev_unprobe(rbd_dev);
6863 rbd_dev->image_format = 0;
6864 kfree(rbd_dev->spec->image_id);
6865 rbd_dev->spec->image_id = NULL;
6866}
6867
6868
6869
6870
6871
6872
6873
6874
6875
6876
6877static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6878{
6879 bool need_watch = !rbd_is_ro(rbd_dev);
6880 int ret;
6881
6882
6883
6884
6885
6886
6887
6888 ret = rbd_dev_image_id(rbd_dev);
6889 if (ret)
6890 return ret;
6891
6892 ret = rbd_dev_header_name(rbd_dev);
6893 if (ret)
6894 goto err_out_format;
6895
6896 if (need_watch) {
6897 ret = rbd_register_watch(rbd_dev);
6898 if (ret) {
6899 if (ret == -ENOENT)
6900 rbd_print_dne(rbd_dev, false);
6901 goto err_out_format;
6902 }
6903 }
6904
6905 if (!depth)
6906 down_write(&rbd_dev->header_rwsem);
6907
6908 ret = rbd_dev_header_info(rbd_dev);
6909 if (ret) {
6910 if (ret == -ENOENT && !need_watch)
6911 rbd_print_dne(rbd_dev, false);
6912 goto err_out_probe;
6913 }
6914
6915
6916
6917
6918
6919
6920
6921 if (!depth)
6922 ret = rbd_spec_fill_snap_id(rbd_dev);
6923 else
6924 ret = rbd_spec_fill_names(rbd_dev);
6925 if (ret) {
6926 if (ret == -ENOENT)
6927 rbd_print_dne(rbd_dev, true);
6928 goto err_out_probe;
6929 }
6930
6931 ret = rbd_dev_mapping_set(rbd_dev);
6932 if (ret)
6933 goto err_out_probe;
6934
6935 if (rbd_is_snap(rbd_dev) &&
6936 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6937 ret = rbd_object_map_load(rbd_dev);
6938 if (ret)
6939 goto err_out_probe;
6940 }
6941
6942 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6943 ret = rbd_dev_v2_parent_info(rbd_dev);
6944 if (ret)
6945 goto err_out_probe;
6946 }
6947
6948 ret = rbd_dev_probe_parent(rbd_dev, depth);
6949 if (ret)
6950 goto err_out_probe;
6951
6952 dout("discovered format %u image, header name is %s\n",
6953 rbd_dev->image_format, rbd_dev->header_oid.name);
6954 return 0;
6955
6956err_out_probe:
6957 if (!depth)
6958 up_write(&rbd_dev->header_rwsem);
6959 if (need_watch)
6960 rbd_unregister_watch(rbd_dev);
6961 rbd_dev_unprobe(rbd_dev);
6962err_out_format:
6963 rbd_dev->image_format = 0;
6964 kfree(rbd_dev->spec->image_id);
6965 rbd_dev->spec->image_id = NULL;
6966 return ret;
6967}
6968
6969static ssize_t do_rbd_add(struct bus_type *bus,
6970 const char *buf,
6971 size_t count)
6972{
6973 struct rbd_device *rbd_dev = NULL;
6974 struct ceph_options *ceph_opts = NULL;
6975 struct rbd_options *rbd_opts = NULL;
6976 struct rbd_spec *spec = NULL;
6977 struct rbd_client *rbdc;
6978 int rc;
6979
6980 if (!capable(CAP_SYS_ADMIN))
6981 return -EPERM;
6982
6983 if (!try_module_get(THIS_MODULE))
6984 return -ENODEV;
6985
6986
6987 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6988 if (rc < 0)
6989 goto out;
6990
6991 rbdc = rbd_get_client(ceph_opts);
6992 if (IS_ERR(rbdc)) {
6993 rc = PTR_ERR(rbdc);
6994 goto err_out_args;
6995 }
6996
6997
6998 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
6999 if (rc < 0) {
7000 if (rc == -ENOENT)
7001 pr_info("pool %s does not exist\n", spec->pool_name);
7002 goto err_out_client;
7003 }
7004 spec->pool_id = (u64)rc;
7005
7006 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7007 if (!rbd_dev) {
7008 rc = -ENOMEM;
7009 goto err_out_client;
7010 }
7011 rbdc = NULL;
7012 spec = NULL;
7013 rbd_opts = NULL;
7014
7015
7016 if (rbd_dev->opts->read_only ||
7017 strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7018 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7019
7020 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7021 if (!rbd_dev->config_info) {
7022 rc = -ENOMEM;
7023 goto err_out_rbd_dev;
7024 }
7025
7026 rc = rbd_dev_image_probe(rbd_dev, 0);
7027 if (rc < 0)
7028 goto err_out_rbd_dev;
7029
7030 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7031 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7032 rbd_dev->layout.object_size);
7033 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7034 }
7035
7036 rc = rbd_dev_device_setup(rbd_dev);
7037 if (rc)
7038 goto err_out_image_probe;
7039
7040 rc = rbd_add_acquire_lock(rbd_dev);
7041 if (rc)
7042 goto err_out_image_lock;
7043
7044
7045
7046 rc = device_add(&rbd_dev->dev);
7047 if (rc)
7048 goto err_out_image_lock;
7049
7050 device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7051
7052 blk_put_queue(rbd_dev->disk->queue);
7053
7054 spin_lock(&rbd_dev_list_lock);
7055 list_add_tail(&rbd_dev->node, &rbd_dev_list);
7056 spin_unlock(&rbd_dev_list_lock);
7057
7058 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7059 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7060 rbd_dev->header.features);
7061 rc = count;
7062out:
7063 module_put(THIS_MODULE);
7064 return rc;
7065
7066err_out_image_lock:
7067 rbd_dev_image_unlock(rbd_dev);
7068 rbd_dev_device_release(rbd_dev);
7069err_out_image_probe:
7070 rbd_dev_image_release(rbd_dev);
7071err_out_rbd_dev:
7072 rbd_dev_destroy(rbd_dev);
7073err_out_client:
7074 rbd_put_client(rbdc);
7075err_out_args:
7076 rbd_spec_put(spec);
7077 kfree(rbd_opts);
7078 goto out;
7079}
7080
7081static ssize_t rbd_add(struct bus_type *bus,
7082 const char *buf,
7083 size_t count)
7084{
7085 if (single_major)
7086 return -EINVAL;
7087
7088 return do_rbd_add(bus, buf, count);
7089}
7090
7091static ssize_t rbd_add_single_major(struct bus_type *bus,
7092 const char *buf,
7093 size_t count)
7094{
7095 return do_rbd_add(bus, buf, count);
7096}
7097
7098static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7099{
7100 while (rbd_dev->parent) {
7101 struct rbd_device *first = rbd_dev;
7102 struct rbd_device *second = first->parent;
7103 struct rbd_device *third;
7104
7105
7106
7107
7108
7109 while (second && (third = second->parent)) {
7110 first = second;
7111 second = third;
7112 }
7113 rbd_assert(second);
7114 rbd_dev_image_release(second);
7115 rbd_dev_destroy(second);
7116 first->parent = NULL;
7117 first->parent_overlap = 0;
7118
7119 rbd_assert(first->parent_spec);
7120 rbd_spec_put(first->parent_spec);
7121 first->parent_spec = NULL;
7122 }
7123}
7124
7125static ssize_t do_rbd_remove(struct bus_type *bus,
7126 const char *buf,
7127 size_t count)
7128{
7129 struct rbd_device *rbd_dev = NULL;
7130 struct list_head *tmp;
7131 int dev_id;
7132 char opt_buf[6];
7133 bool force = false;
7134 int ret;
7135
7136 if (!capable(CAP_SYS_ADMIN))
7137 return -EPERM;
7138
7139 dev_id = -1;
7140 opt_buf[0] = '\0';
7141 sscanf(buf, "%d %5s", &dev_id, opt_buf);
7142 if (dev_id < 0) {
7143 pr_err("dev_id out of range\n");
7144 return -EINVAL;
7145 }
7146 if (opt_buf[0] != '\0') {
7147 if (!strcmp(opt_buf, "force")) {
7148 force = true;
7149 } else {
7150 pr_err("bad remove option at '%s'\n", opt_buf);
7151 return -EINVAL;
7152 }
7153 }
7154
7155 ret = -ENOENT;
7156 spin_lock(&rbd_dev_list_lock);
7157 list_for_each(tmp, &rbd_dev_list) {
7158 rbd_dev = list_entry(tmp, struct rbd_device, node);
7159 if (rbd_dev->dev_id == dev_id) {
7160 ret = 0;
7161 break;
7162 }
7163 }
7164 if (!ret) {
7165 spin_lock_irq(&rbd_dev->lock);
7166 if (rbd_dev->open_count && !force)
7167 ret = -EBUSY;
7168 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7169 &rbd_dev->flags))
7170 ret = -EINPROGRESS;
7171 spin_unlock_irq(&rbd_dev->lock);
7172 }
7173 spin_unlock(&rbd_dev_list_lock);
7174 if (ret)
7175 return ret;
7176
7177 if (force) {
7178
7179
7180
7181
7182 blk_mq_freeze_queue(rbd_dev->disk->queue);
7183 blk_set_queue_dying(rbd_dev->disk->queue);
7184 }
7185
7186 del_gendisk(rbd_dev->disk);
7187 spin_lock(&rbd_dev_list_lock);
7188 list_del_init(&rbd_dev->node);
7189 spin_unlock(&rbd_dev_list_lock);
7190 device_del(&rbd_dev->dev);
7191
7192 rbd_dev_image_unlock(rbd_dev);
7193 rbd_dev_device_release(rbd_dev);
7194 rbd_dev_image_release(rbd_dev);
7195 rbd_dev_destroy(rbd_dev);
7196 return count;
7197}
7198
7199static ssize_t rbd_remove(struct bus_type *bus,
7200 const char *buf,
7201 size_t count)
7202{
7203 if (single_major)
7204 return -EINVAL;
7205
7206 return do_rbd_remove(bus, buf, count);
7207}
7208
7209static ssize_t rbd_remove_single_major(struct bus_type *bus,
7210 const char *buf,
7211 size_t count)
7212{
7213 return do_rbd_remove(bus, buf, count);
7214}
7215
7216
7217
7218
7219
7220static int __init rbd_sysfs_init(void)
7221{
7222 int ret;
7223
7224 ret = device_register(&rbd_root_dev);
7225 if (ret < 0)
7226 return ret;
7227
7228 ret = bus_register(&rbd_bus_type);
7229 if (ret < 0)
7230 device_unregister(&rbd_root_dev);
7231
7232 return ret;
7233}
7234
7235static void __exit rbd_sysfs_cleanup(void)
7236{
7237 bus_unregister(&rbd_bus_type);
7238 device_unregister(&rbd_root_dev);
7239}
7240
7241static int __init rbd_slab_init(void)
7242{
7243 rbd_assert(!rbd_img_request_cache);
7244 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7245 if (!rbd_img_request_cache)
7246 return -ENOMEM;
7247
7248 rbd_assert(!rbd_obj_request_cache);
7249 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7250 if (!rbd_obj_request_cache)
7251 goto out_err;
7252
7253 return 0;
7254
7255out_err:
7256 kmem_cache_destroy(rbd_img_request_cache);
7257 rbd_img_request_cache = NULL;
7258 return -ENOMEM;
7259}
7260
7261static void rbd_slab_exit(void)
7262{
7263 rbd_assert(rbd_obj_request_cache);
7264 kmem_cache_destroy(rbd_obj_request_cache);
7265 rbd_obj_request_cache = NULL;
7266
7267 rbd_assert(rbd_img_request_cache);
7268 kmem_cache_destroy(rbd_img_request_cache);
7269 rbd_img_request_cache = NULL;
7270}
7271
7272static int __init rbd_init(void)
7273{
7274 int rc;
7275
7276 if (!libceph_compatible(NULL)) {
7277 rbd_warn(NULL, "libceph incompatibility (quitting)");
7278 return -EINVAL;
7279 }
7280
7281 rc = rbd_slab_init();
7282 if (rc)
7283 return rc;
7284
7285
7286
7287
7288
7289 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7290 if (!rbd_wq) {
7291 rc = -ENOMEM;
7292 goto err_out_slab;
7293 }
7294
7295 if (single_major) {
7296 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7297 if (rbd_major < 0) {
7298 rc = rbd_major;
7299 goto err_out_wq;
7300 }
7301 }
7302
7303 rc = rbd_sysfs_init();
7304 if (rc)
7305 goto err_out_blkdev;
7306
7307 if (single_major)
7308 pr_info("loaded (major %d)\n", rbd_major);
7309 else
7310 pr_info("loaded\n");
7311
7312 return 0;
7313
7314err_out_blkdev:
7315 if (single_major)
7316 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7317err_out_wq:
7318 destroy_workqueue(rbd_wq);
7319err_out_slab:
7320 rbd_slab_exit();
7321 return rc;
7322}
7323
7324static void __exit rbd_exit(void)
7325{
7326 ida_destroy(&rbd_dev_id_ida);
7327 rbd_sysfs_cleanup();
7328 if (single_major)
7329 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7330 destroy_workqueue(rbd_wq);
7331 rbd_slab_exit();
7332}
7333
7334module_init(rbd_init);
7335module_exit(rbd_exit);
7336
7337MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7338MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7339MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7340
7341MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7342
7343MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7344MODULE_LICENSE("GPL");
7345