1
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13
14#include "super.h"
15#include "mds_client.h"
16
17#include <linux/ceph/ceph_features.h>
18#include <linux/ceph/messenger.h>
19#include <linux/ceph/decode.h>
20#include <linux/ceph/pagelist.h>
21#include <linux/ceph/auth.h>
22#include <linux/ceph/debugfs.h>
23
24#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51struct ceph_reconnect_state {
52 struct ceph_mds_session *session;
53 int nr_caps, nr_realms;
54 struct ceph_pagelist *pagelist;
55 unsigned msg_version;
56 bool allow_multi;
57};
58
59static void __wake_requests(struct ceph_mds_client *mdsc,
60 struct list_head *head);
61static void ceph_cap_release_work(struct work_struct *work);
62static void ceph_cap_reclaim_work(struct work_struct *work);
63
64static const struct ceph_connection_operations mds_con_ops;
65
66
67
68
69
70
71static int parse_reply_info_quota(void **p, void *end,
72 struct ceph_mds_reply_info_in *info)
73{
74 u8 struct_v, struct_compat;
75 u32 struct_len;
76
77 ceph_decode_8_safe(p, end, struct_v, bad);
78 ceph_decode_8_safe(p, end, struct_compat, bad);
79
80
81 if (!struct_v || struct_compat != 1)
82 goto bad;
83 ceph_decode_32_safe(p, end, struct_len, bad);
84 ceph_decode_need(p, end, struct_len, bad);
85 end = *p + struct_len;
86 ceph_decode_64_safe(p, end, info->max_bytes, bad);
87 ceph_decode_64_safe(p, end, info->max_files, bad);
88 *p = end;
89 return 0;
90bad:
91 return -EIO;
92}
93
94
95
96
97static int parse_reply_info_in(void **p, void *end,
98 struct ceph_mds_reply_info_in *info,
99 u64 features)
100{
101 int err = 0;
102 u8 struct_v = 0;
103
104 if (features == (u64)-1) {
105 u32 struct_len;
106 u8 struct_compat;
107 ceph_decode_8_safe(p, end, struct_v, bad);
108 ceph_decode_8_safe(p, end, struct_compat, bad);
109
110
111 if (!struct_v || struct_compat != 1)
112 goto bad;
113 ceph_decode_32_safe(p, end, struct_len, bad);
114 ceph_decode_need(p, end, struct_len, bad);
115 end = *p + struct_len;
116 }
117
118 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
119 info->in = *p;
120 *p += sizeof(struct ceph_mds_reply_inode) +
121 sizeof(*info->in->fragtree.splits) *
122 le32_to_cpu(info->in->fragtree.nsplits);
123
124 ceph_decode_32_safe(p, end, info->symlink_len, bad);
125 ceph_decode_need(p, end, info->symlink_len, bad);
126 info->symlink = *p;
127 *p += info->symlink_len;
128
129 ceph_decode_copy_safe(p, end, &info->dir_layout,
130 sizeof(info->dir_layout), bad);
131 ceph_decode_32_safe(p, end, info->xattr_len, bad);
132 ceph_decode_need(p, end, info->xattr_len, bad);
133 info->xattr_data = *p;
134 *p += info->xattr_len;
135
136 if (features == (u64)-1) {
137
138 ceph_decode_64_safe(p, end, info->inline_version, bad);
139 ceph_decode_32_safe(p, end, info->inline_len, bad);
140 ceph_decode_need(p, end, info->inline_len, bad);
141 info->inline_data = *p;
142 *p += info->inline_len;
143
144 err = parse_reply_info_quota(p, end, info);
145 if (err < 0)
146 goto out_bad;
147
148 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
149 if (info->pool_ns_len > 0) {
150 ceph_decode_need(p, end, info->pool_ns_len, bad);
151 info->pool_ns_data = *p;
152 *p += info->pool_ns_len;
153 }
154
155
156 ceph_decode_need(p, end, sizeof(info->btime), bad);
157 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
158
159
160 ceph_decode_64_safe(p, end, info->change_attr, bad);
161
162
163 if (struct_v >= 2) {
164 ceph_decode_32_safe(p, end, info->dir_pin, bad);
165 } else {
166 info->dir_pin = -ENODATA;
167 }
168
169
170 if (struct_v >= 3) {
171 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
172 ceph_decode_copy(p, &info->snap_btime,
173 sizeof(info->snap_btime));
174 } else {
175 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
176 }
177
178 *p = end;
179 } else {
180 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
181 ceph_decode_64_safe(p, end, info->inline_version, bad);
182 ceph_decode_32_safe(p, end, info->inline_len, bad);
183 ceph_decode_need(p, end, info->inline_len, bad);
184 info->inline_data = *p;
185 *p += info->inline_len;
186 } else
187 info->inline_version = CEPH_INLINE_NONE;
188
189 if (features & CEPH_FEATURE_MDS_QUOTA) {
190 err = parse_reply_info_quota(p, end, info);
191 if (err < 0)
192 goto out_bad;
193 } else {
194 info->max_bytes = 0;
195 info->max_files = 0;
196 }
197
198 info->pool_ns_len = 0;
199 info->pool_ns_data = NULL;
200 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
201 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
202 if (info->pool_ns_len > 0) {
203 ceph_decode_need(p, end, info->pool_ns_len, bad);
204 info->pool_ns_data = *p;
205 *p += info->pool_ns_len;
206 }
207 }
208
209 if (features & CEPH_FEATURE_FS_BTIME) {
210 ceph_decode_need(p, end, sizeof(info->btime), bad);
211 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
212 ceph_decode_64_safe(p, end, info->change_attr, bad);
213 }
214
215 info->dir_pin = -ENODATA;
216
217 }
218 return 0;
219bad:
220 err = -EIO;
221out_bad:
222 return err;
223}
224
225static int parse_reply_info_dir(void **p, void *end,
226 struct ceph_mds_reply_dirfrag **dirfrag,
227 u64 features)
228{
229 if (features == (u64)-1) {
230 u8 struct_v, struct_compat;
231 u32 struct_len;
232 ceph_decode_8_safe(p, end, struct_v, bad);
233 ceph_decode_8_safe(p, end, struct_compat, bad);
234
235
236 if (!struct_v || struct_compat != 1)
237 goto bad;
238 ceph_decode_32_safe(p, end, struct_len, bad);
239 ceph_decode_need(p, end, struct_len, bad);
240 end = *p + struct_len;
241 }
242
243 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
244 *dirfrag = *p;
245 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
246 if (unlikely(*p > end))
247 goto bad;
248 if (features == (u64)-1)
249 *p = end;
250 return 0;
251bad:
252 return -EIO;
253}
254
255static int parse_reply_info_lease(void **p, void *end,
256 struct ceph_mds_reply_lease **lease,
257 u64 features)
258{
259 if (features == (u64)-1) {
260 u8 struct_v, struct_compat;
261 u32 struct_len;
262 ceph_decode_8_safe(p, end, struct_v, bad);
263 ceph_decode_8_safe(p, end, struct_compat, bad);
264
265
266 if (!struct_v || struct_compat != 1)
267 goto bad;
268 ceph_decode_32_safe(p, end, struct_len, bad);
269 ceph_decode_need(p, end, struct_len, bad);
270 end = *p + struct_len;
271 }
272
273 ceph_decode_need(p, end, sizeof(**lease), bad);
274 *lease = *p;
275 *p += sizeof(**lease);
276 if (features == (u64)-1)
277 *p = end;
278 return 0;
279bad:
280 return -EIO;
281}
282
283
284
285
286
287static int parse_reply_info_trace(void **p, void *end,
288 struct ceph_mds_reply_info_parsed *info,
289 u64 features)
290{
291 int err;
292
293 if (info->head->is_dentry) {
294 err = parse_reply_info_in(p, end, &info->diri, features);
295 if (err < 0)
296 goto out_bad;
297
298 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
299 if (err < 0)
300 goto out_bad;
301
302 ceph_decode_32_safe(p, end, info->dname_len, bad);
303 ceph_decode_need(p, end, info->dname_len, bad);
304 info->dname = *p;
305 *p += info->dname_len;
306
307 err = parse_reply_info_lease(p, end, &info->dlease, features);
308 if (err < 0)
309 goto out_bad;
310 }
311
312 if (info->head->is_target) {
313 err = parse_reply_info_in(p, end, &info->targeti, features);
314 if (err < 0)
315 goto out_bad;
316 }
317
318 if (unlikely(*p != end))
319 goto bad;
320 return 0;
321
322bad:
323 err = -EIO;
324out_bad:
325 pr_err("problem parsing mds trace %d\n", err);
326 return err;
327}
328
329
330
331
332static int parse_reply_info_readdir(void **p, void *end,
333 struct ceph_mds_reply_info_parsed *info,
334 u64 features)
335{
336 u32 num, i = 0;
337 int err;
338
339 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
340 if (err < 0)
341 goto out_bad;
342
343 ceph_decode_need(p, end, sizeof(num) + 2, bad);
344 num = ceph_decode_32(p);
345 {
346 u16 flags = ceph_decode_16(p);
347 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
348 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
349 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
350 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
351 }
352 if (num == 0)
353 goto done;
354
355 BUG_ON(!info->dir_entries);
356 if ((unsigned long)(info->dir_entries + num) >
357 (unsigned long)info->dir_entries + info->dir_buf_size) {
358 pr_err("dir contents are larger than expected\n");
359 WARN_ON(1);
360 goto bad;
361 }
362
363 info->dir_nr = num;
364 while (num) {
365 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
366
367 ceph_decode_32_safe(p, end, rde->name_len, bad);
368 ceph_decode_need(p, end, rde->name_len, bad);
369 rde->name = *p;
370 *p += rde->name_len;
371 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
372
373
374 err = parse_reply_info_lease(p, end, &rde->lease, features);
375 if (err)
376 goto out_bad;
377
378 err = parse_reply_info_in(p, end, &rde->inode, features);
379 if (err < 0)
380 goto out_bad;
381
382 rde->offset = 0;
383 i++;
384 num--;
385 }
386
387done:
388
389 *p = end;
390 return 0;
391
392bad:
393 err = -EIO;
394out_bad:
395 pr_err("problem parsing dir contents %d\n", err);
396 return err;
397}
398
399
400
401
402static int parse_reply_info_filelock(void **p, void *end,
403 struct ceph_mds_reply_info_parsed *info,
404 u64 features)
405{
406 if (*p + sizeof(*info->filelock_reply) > end)
407 goto bad;
408
409 info->filelock_reply = *p;
410
411
412 *p = end;
413 return 0;
414bad:
415 return -EIO;
416}
417
418
419#if BITS_PER_LONG == 64
420
421#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
422
423static int ceph_parse_deleg_inos(void **p, void *end,
424 struct ceph_mds_session *s)
425{
426 u32 sets;
427
428 ceph_decode_32_safe(p, end, sets, bad);
429 dout("got %u sets of delegated inodes\n", sets);
430 while (sets--) {
431 u64 start, len, ino;
432
433 ceph_decode_64_safe(p, end, start, bad);
434 ceph_decode_64_safe(p, end, len, bad);
435 while (len--) {
436 int err = xa_insert(&s->s_delegated_inos, ino = start++,
437 DELEGATED_INO_AVAILABLE,
438 GFP_KERNEL);
439 if (!err) {
440 dout("added delegated inode 0x%llx\n",
441 start - 1);
442 } else if (err == -EBUSY) {
443 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
444 start - 1);
445 } else {
446 return err;
447 }
448 }
449 }
450 return 0;
451bad:
452 return -EIO;
453}
454
455u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
456{
457 unsigned long ino;
458 void *val;
459
460 xa_for_each(&s->s_delegated_inos, ino, val) {
461 val = xa_erase(&s->s_delegated_inos, ino);
462 if (val == DELEGATED_INO_AVAILABLE)
463 return ino;
464 }
465 return 0;
466}
467
468int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
469{
470 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
471 GFP_KERNEL);
472}
473#else
474
475
476
477
478
479static int ceph_parse_deleg_inos(void **p, void *end,
480 struct ceph_mds_session *s)
481{
482 u32 sets;
483
484 ceph_decode_32_safe(p, end, sets, bad);
485 if (sets)
486 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
487 return 0;
488bad:
489 return -EIO;
490}
491
492u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
493{
494 return 0;
495}
496
497int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
498{
499 return 0;
500}
501#endif
502
503
504
505
506static int parse_reply_info_create(void **p, void *end,
507 struct ceph_mds_reply_info_parsed *info,
508 u64 features, struct ceph_mds_session *s)
509{
510 int ret;
511
512 if (features == (u64)-1 ||
513 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
514 if (*p == end) {
515
516 info->has_create_ino = false;
517 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
518 u8 struct_v, struct_compat;
519 u32 len;
520
521 info->has_create_ino = true;
522 ceph_decode_8_safe(p, end, struct_v, bad);
523 ceph_decode_8_safe(p, end, struct_compat, bad);
524 ceph_decode_32_safe(p, end, len, bad);
525 ceph_decode_64_safe(p, end, info->ino, bad);
526 ret = ceph_parse_deleg_inos(p, end, s);
527 if (ret)
528 return ret;
529 } else {
530
531 ceph_decode_64_safe(p, end, info->ino, bad);
532 info->has_create_ino = true;
533 }
534 } else {
535 if (*p != end)
536 goto bad;
537 }
538
539
540 *p = end;
541 return 0;
542bad:
543 return -EIO;
544}
545
546
547
548
549static int parse_reply_info_extra(void **p, void *end,
550 struct ceph_mds_reply_info_parsed *info,
551 u64 features, struct ceph_mds_session *s)
552{
553 u32 op = le32_to_cpu(info->head->op);
554
555 if (op == CEPH_MDS_OP_GETFILELOCK)
556 return parse_reply_info_filelock(p, end, info, features);
557 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
558 return parse_reply_info_readdir(p, end, info, features);
559 else if (op == CEPH_MDS_OP_CREATE)
560 return parse_reply_info_create(p, end, info, features, s);
561 else
562 return -EIO;
563}
564
565
566
567
568static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
569 struct ceph_mds_reply_info_parsed *info,
570 u64 features)
571{
572 void *p, *end;
573 u32 len;
574 int err;
575
576 info->head = msg->front.iov_base;
577 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
578 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
579
580
581 ceph_decode_32_safe(&p, end, len, bad);
582 if (len > 0) {
583 ceph_decode_need(&p, end, len, bad);
584 err = parse_reply_info_trace(&p, p+len, info, features);
585 if (err < 0)
586 goto out_bad;
587 }
588
589
590 ceph_decode_32_safe(&p, end, len, bad);
591 if (len > 0) {
592 ceph_decode_need(&p, end, len, bad);
593 err = parse_reply_info_extra(&p, p+len, info, features, s);
594 if (err < 0)
595 goto out_bad;
596 }
597
598
599 ceph_decode_32_safe(&p, end, len, bad);
600 info->snapblob_len = len;
601 info->snapblob = p;
602 p += len;
603
604 if (p != end)
605 goto bad;
606 return 0;
607
608bad:
609 err = -EIO;
610out_bad:
611 pr_err("mds parse_reply err %d\n", err);
612 return err;
613}
614
615static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
616{
617 if (!info->dir_entries)
618 return;
619 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
620}
621
622
623
624
625
626const char *ceph_session_state_name(int s)
627{
628 switch (s) {
629 case CEPH_MDS_SESSION_NEW: return "new";
630 case CEPH_MDS_SESSION_OPENING: return "opening";
631 case CEPH_MDS_SESSION_OPEN: return "open";
632 case CEPH_MDS_SESSION_HUNG: return "hung";
633 case CEPH_MDS_SESSION_CLOSING: return "closing";
634 case CEPH_MDS_SESSION_CLOSED: return "closed";
635 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
636 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
637 case CEPH_MDS_SESSION_REJECTED: return "rejected";
638 default: return "???";
639 }
640}
641
642struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
643{
644 if (refcount_inc_not_zero(&s->s_ref)) {
645 dout("mdsc get_session %p %d -> %d\n", s,
646 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
647 return s;
648 } else {
649 dout("mdsc get_session %p 0 -- FAIL\n", s);
650 return NULL;
651 }
652}
653
654void ceph_put_mds_session(struct ceph_mds_session *s)
655{
656 dout("mdsc put_session %p %d -> %d\n", s,
657 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
658 if (refcount_dec_and_test(&s->s_ref)) {
659 if (s->s_auth.authorizer)
660 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
661 WARN_ON(mutex_is_locked(&s->s_mutex));
662 xa_destroy(&s->s_delegated_inos);
663 kfree(s);
664 }
665}
666
667
668
669
670struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
671 int mds)
672{
673 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
674 return NULL;
675 return ceph_get_mds_session(mdsc->sessions[mds]);
676}
677
678static bool __have_session(struct ceph_mds_client *mdsc, int mds)
679{
680 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
681 return false;
682 else
683 return true;
684}
685
686static int __verify_registered_session(struct ceph_mds_client *mdsc,
687 struct ceph_mds_session *s)
688{
689 if (s->s_mds >= mdsc->max_sessions ||
690 mdsc->sessions[s->s_mds] != s)
691 return -ENOENT;
692 return 0;
693}
694
695
696
697
698
699static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
700 int mds)
701{
702 struct ceph_mds_session *s;
703
704 if (mds >= mdsc->mdsmap->possible_max_rank)
705 return ERR_PTR(-EINVAL);
706
707 s = kzalloc(sizeof(*s), GFP_NOFS);
708 if (!s)
709 return ERR_PTR(-ENOMEM);
710
711 if (mds >= mdsc->max_sessions) {
712 int newmax = 1 << get_count_order(mds + 1);
713 struct ceph_mds_session **sa;
714
715 dout("%s: realloc to %d\n", __func__, newmax);
716 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
717 if (!sa)
718 goto fail_realloc;
719 if (mdsc->sessions) {
720 memcpy(sa, mdsc->sessions,
721 mdsc->max_sessions * sizeof(void *));
722 kfree(mdsc->sessions);
723 }
724 mdsc->sessions = sa;
725 mdsc->max_sessions = newmax;
726 }
727
728 dout("%s: mds%d\n", __func__, mds);
729 s->s_mdsc = mdsc;
730 s->s_mds = mds;
731 s->s_state = CEPH_MDS_SESSION_NEW;
732 s->s_ttl = 0;
733 s->s_seq = 0;
734 mutex_init(&s->s_mutex);
735
736 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
737
738 spin_lock_init(&s->s_gen_ttl_lock);
739 s->s_cap_gen = 1;
740 s->s_cap_ttl = jiffies - 1;
741
742 spin_lock_init(&s->s_cap_lock);
743 s->s_renew_requested = 0;
744 s->s_renew_seq = 0;
745 INIT_LIST_HEAD(&s->s_caps);
746 s->s_nr_caps = 0;
747 refcount_set(&s->s_ref, 1);
748 INIT_LIST_HEAD(&s->s_waiting);
749 INIT_LIST_HEAD(&s->s_unsafe);
750 xa_init(&s->s_delegated_inos);
751 s->s_num_cap_releases = 0;
752 s->s_cap_reconnect = 0;
753 s->s_cap_iterator = NULL;
754 INIT_LIST_HEAD(&s->s_cap_releases);
755 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
756
757 INIT_LIST_HEAD(&s->s_cap_dirty);
758 INIT_LIST_HEAD(&s->s_cap_flushing);
759
760 mdsc->sessions[mds] = s;
761 atomic_inc(&mdsc->num_sessions);
762 refcount_inc(&s->s_ref);
763
764 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
765 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
766
767 return s;
768
769fail_realloc:
770 kfree(s);
771 return ERR_PTR(-ENOMEM);
772}
773
774
775
776
777static void __unregister_session(struct ceph_mds_client *mdsc,
778 struct ceph_mds_session *s)
779{
780 dout("__unregister_session mds%d %p\n", s->s_mds, s);
781 BUG_ON(mdsc->sessions[s->s_mds] != s);
782 mdsc->sessions[s->s_mds] = NULL;
783 ceph_con_close(&s->s_con);
784 ceph_put_mds_session(s);
785 atomic_dec(&mdsc->num_sessions);
786}
787
788
789
790
791
792
793static void put_request_session(struct ceph_mds_request *req)
794{
795 if (req->r_session) {
796 ceph_put_mds_session(req->r_session);
797 req->r_session = NULL;
798 }
799}
800
801void ceph_mdsc_release_request(struct kref *kref)
802{
803 struct ceph_mds_request *req = container_of(kref,
804 struct ceph_mds_request,
805 r_kref);
806 ceph_mdsc_release_dir_caps_no_check(req);
807 destroy_reply_info(&req->r_reply_info);
808 if (req->r_request)
809 ceph_msg_put(req->r_request);
810 if (req->r_reply)
811 ceph_msg_put(req->r_reply);
812 if (req->r_inode) {
813 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
814
815 ceph_async_iput(req->r_inode);
816 }
817 if (req->r_parent) {
818 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
819 ceph_async_iput(req->r_parent);
820 }
821 ceph_async_iput(req->r_target_inode);
822 if (req->r_dentry)
823 dput(req->r_dentry);
824 if (req->r_old_dentry)
825 dput(req->r_old_dentry);
826 if (req->r_old_dentry_dir) {
827
828
829
830
831
832
833 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
834 CEPH_CAP_PIN);
835 ceph_async_iput(req->r_old_dentry_dir);
836 }
837 kfree(req->r_path1);
838 kfree(req->r_path2);
839 if (req->r_pagelist)
840 ceph_pagelist_release(req->r_pagelist);
841 put_request_session(req);
842 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
843 WARN_ON_ONCE(!list_empty(&req->r_wait));
844 kmem_cache_free(ceph_mds_request_cachep, req);
845}
846
847DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
848
849
850
851
852
853
854static struct ceph_mds_request *
855lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
856{
857 struct ceph_mds_request *req;
858
859 req = lookup_request(&mdsc->request_tree, tid);
860 if (req)
861 ceph_mdsc_get_request(req);
862
863 return req;
864}
865
866
867
868
869
870
871
872static void __register_request(struct ceph_mds_client *mdsc,
873 struct ceph_mds_request *req,
874 struct inode *dir)
875{
876 int ret = 0;
877
878 req->r_tid = ++mdsc->last_tid;
879 if (req->r_num_caps) {
880 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
881 req->r_num_caps);
882 if (ret < 0) {
883 pr_err("__register_request %p "
884 "failed to reserve caps: %d\n", req, ret);
885
886 req->r_err = ret;
887 return;
888 }
889 }
890 dout("__register_request %p tid %lld\n", req, req->r_tid);
891 ceph_mdsc_get_request(req);
892 insert_request(&mdsc->request_tree, req);
893
894 req->r_uid = current_fsuid();
895 req->r_gid = current_fsgid();
896
897 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
898 mdsc->oldest_tid = req->r_tid;
899
900 if (dir) {
901 struct ceph_inode_info *ci = ceph_inode(dir);
902
903 ihold(dir);
904 req->r_unsafe_dir = dir;
905 spin_lock(&ci->i_unsafe_lock);
906 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
907 spin_unlock(&ci->i_unsafe_lock);
908 }
909}
910
911static void __unregister_request(struct ceph_mds_client *mdsc,
912 struct ceph_mds_request *req)
913{
914 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
915
916
917 list_del_init(&req->r_unsafe_item);
918
919 if (req->r_tid == mdsc->oldest_tid) {
920 struct rb_node *p = rb_next(&req->r_node);
921 mdsc->oldest_tid = 0;
922 while (p) {
923 struct ceph_mds_request *next_req =
924 rb_entry(p, struct ceph_mds_request, r_node);
925 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
926 mdsc->oldest_tid = next_req->r_tid;
927 break;
928 }
929 p = rb_next(p);
930 }
931 }
932
933 erase_request(&mdsc->request_tree, req);
934
935 if (req->r_unsafe_dir) {
936 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
937 spin_lock(&ci->i_unsafe_lock);
938 list_del_init(&req->r_unsafe_dir_item);
939 spin_unlock(&ci->i_unsafe_lock);
940 }
941 if (req->r_target_inode &&
942 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
943 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
944 spin_lock(&ci->i_unsafe_lock);
945 list_del_init(&req->r_unsafe_target_item);
946 spin_unlock(&ci->i_unsafe_lock);
947 }
948
949 if (req->r_unsafe_dir) {
950
951 ceph_async_iput(req->r_unsafe_dir);
952 req->r_unsafe_dir = NULL;
953 }
954
955 complete_all(&req->r_safe_completion);
956
957 ceph_mdsc_put_request(req);
958}
959
960
961
962
963
964
965
966
967static struct inode *get_nonsnap_parent(struct dentry *dentry)
968{
969 struct inode *inode = NULL;
970
971 while (dentry && !IS_ROOT(dentry)) {
972 inode = d_inode_rcu(dentry);
973 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
974 break;
975 dentry = dentry->d_parent;
976 }
977 if (inode)
978 inode = igrab(inode);
979 return inode;
980}
981
982
983
984
985
986
987
988
989
990static int __choose_mds(struct ceph_mds_client *mdsc,
991 struct ceph_mds_request *req,
992 bool *random)
993{
994 struct inode *inode;
995 struct ceph_inode_info *ci;
996 struct ceph_cap *cap;
997 int mode = req->r_direct_mode;
998 int mds = -1;
999 u32 hash = req->r_direct_hash;
1000 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1001
1002 if (random)
1003 *random = false;
1004
1005
1006
1007
1008
1009 if (req->r_resend_mds >= 0 &&
1010 (__have_session(mdsc, req->r_resend_mds) ||
1011 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1012 dout("%s using resend_mds mds%d\n", __func__,
1013 req->r_resend_mds);
1014 return req->r_resend_mds;
1015 }
1016
1017 if (mode == USE_RANDOM_MDS)
1018 goto random;
1019
1020 inode = NULL;
1021 if (req->r_inode) {
1022 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1023 inode = req->r_inode;
1024 ihold(inode);
1025 } else {
1026
1027 rcu_read_lock();
1028 inode = get_nonsnap_parent(req->r_dentry);
1029 rcu_read_unlock();
1030 dout("%s using snapdir's parent %p\n", __func__, inode);
1031 }
1032 } else if (req->r_dentry) {
1033
1034 struct dentry *parent;
1035 struct inode *dir;
1036
1037 rcu_read_lock();
1038 parent = READ_ONCE(req->r_dentry->d_parent);
1039 dir = req->r_parent ? : d_inode_rcu(parent);
1040
1041 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1042
1043 inode = d_inode(req->r_dentry);
1044 if (inode)
1045 ihold(inode);
1046 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1047
1048
1049 inode = get_nonsnap_parent(parent);
1050 dout("%s using nonsnap parent %p\n", __func__, inode);
1051 } else {
1052
1053 inode = d_inode(req->r_dentry);
1054 if (!inode || mode == USE_AUTH_MDS) {
1055
1056 inode = igrab(dir);
1057 hash = ceph_dentry_hash(dir, req->r_dentry);
1058 is_hash = true;
1059 } else {
1060 ihold(inode);
1061 }
1062 }
1063 rcu_read_unlock();
1064 }
1065
1066 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1067 hash, mode);
1068 if (!inode)
1069 goto random;
1070 ci = ceph_inode(inode);
1071
1072 if (is_hash && S_ISDIR(inode->i_mode)) {
1073 struct ceph_inode_frag frag;
1074 int found;
1075
1076 ceph_choose_frag(ci, hash, &frag, &found);
1077 if (found) {
1078 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1079 u8 r;
1080
1081
1082 get_random_bytes(&r, 1);
1083 r %= frag.ndist;
1084 mds = frag.dist[r];
1085 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1086 __func__, inode, ceph_vinop(inode),
1087 frag.frag, mds, (int)r, frag.ndist);
1088 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1089 CEPH_MDS_STATE_ACTIVE &&
1090 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1091 goto out;
1092 }
1093
1094
1095
1096
1097 if (frag.mds >= 0) {
1098
1099 mds = frag.mds;
1100 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1101 __func__, inode, ceph_vinop(inode),
1102 frag.frag, mds);
1103 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1104 CEPH_MDS_STATE_ACTIVE) {
1105 if (mode == USE_ANY_MDS &&
1106 !ceph_mdsmap_is_laggy(mdsc->mdsmap,
1107 mds))
1108 goto out;
1109 }
1110 }
1111 mode = USE_AUTH_MDS;
1112 }
1113 }
1114
1115 spin_lock(&ci->i_ceph_lock);
1116 cap = NULL;
1117 if (mode == USE_AUTH_MDS)
1118 cap = ci->i_auth_cap;
1119 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1120 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1121 if (!cap) {
1122 spin_unlock(&ci->i_ceph_lock);
1123 ceph_async_iput(inode);
1124 goto random;
1125 }
1126 mds = cap->session->s_mds;
1127 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1128 inode, ceph_vinop(inode), mds,
1129 cap == ci->i_auth_cap ? "auth " : "", cap);
1130 spin_unlock(&ci->i_ceph_lock);
1131out:
1132
1133
1134 ceph_async_iput(inode);
1135 return mds;
1136
1137random:
1138 if (random)
1139 *random = true;
1140
1141 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1142 dout("%s chose random mds%d\n", __func__, mds);
1143 return mds;
1144}
1145
1146
1147
1148
1149
1150static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1151{
1152 struct ceph_msg *msg;
1153 struct ceph_mds_session_head *h;
1154
1155 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1156 false);
1157 if (!msg) {
1158 pr_err("create_session_msg ENOMEM creating msg\n");
1159 return NULL;
1160 }
1161 h = msg->front.iov_base;
1162 h->op = cpu_to_le32(op);
1163 h->seq = cpu_to_le64(seq);
1164
1165 return msg;
1166}
1167
1168static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1169#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1170static void encode_supported_features(void **p, void *end)
1171{
1172 static const size_t count = ARRAY_SIZE(feature_bits);
1173
1174 if (count > 0) {
1175 size_t i;
1176 size_t size = FEATURE_BYTES(count);
1177
1178 BUG_ON(*p + 4 + size > end);
1179 ceph_encode_32(p, size);
1180 memset(*p, 0, size);
1181 for (i = 0; i < count; i++)
1182 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1183 *p += size;
1184 } else {
1185 BUG_ON(*p + 4 > end);
1186 ceph_encode_32(p, 0);
1187 }
1188}
1189
1190
1191
1192
1193
1194static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1195{
1196 struct ceph_msg *msg;
1197 struct ceph_mds_session_head *h;
1198 int i = -1;
1199 int extra_bytes = 0;
1200 int metadata_key_count = 0;
1201 struct ceph_options *opt = mdsc->fsc->client->options;
1202 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1203 size_t size, count;
1204 void *p, *end;
1205
1206 const char* metadata[][2] = {
1207 {"hostname", mdsc->nodename},
1208 {"kernel_version", init_utsname()->release},
1209 {"entity_id", opt->name ? : ""},
1210 {"root", fsopt->server_path ? : "/"},
1211 {NULL, NULL}
1212 };
1213
1214
1215 extra_bytes = 4;
1216 for (i = 0; metadata[i][0]; ++i) {
1217 extra_bytes += 8 + strlen(metadata[i][0]) +
1218 strlen(metadata[i][1]);
1219 metadata_key_count++;
1220 }
1221
1222
1223 size = 0;
1224 count = ARRAY_SIZE(feature_bits);
1225 if (count > 0)
1226 size = FEATURE_BYTES(count);
1227 extra_bytes += 4 + size;
1228
1229
1230 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1231 GFP_NOFS, false);
1232 if (!msg) {
1233 pr_err("create_session_msg ENOMEM creating msg\n");
1234 return NULL;
1235 }
1236 p = msg->front.iov_base;
1237 end = p + msg->front.iov_len;
1238
1239 h = p;
1240 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1241 h->seq = cpu_to_le64(seq);
1242
1243
1244
1245
1246
1247
1248
1249 msg->hdr.version = cpu_to_le16(3);
1250 msg->hdr.compat_version = cpu_to_le16(1);
1251
1252
1253 p += sizeof(*h);
1254
1255
1256 ceph_encode_32(&p, metadata_key_count);
1257
1258
1259 for (i = 0; metadata[i][0]; ++i) {
1260 size_t const key_len = strlen(metadata[i][0]);
1261 size_t const val_len = strlen(metadata[i][1]);
1262
1263 ceph_encode_32(&p, key_len);
1264 memcpy(p, metadata[i][0], key_len);
1265 p += key_len;
1266 ceph_encode_32(&p, val_len);
1267 memcpy(p, metadata[i][1], val_len);
1268 p += val_len;
1269 }
1270
1271 encode_supported_features(&p, end);
1272 msg->front.iov_len = p - msg->front.iov_base;
1273 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1274
1275 return msg;
1276}
1277
1278
1279
1280
1281
1282
1283static int __open_session(struct ceph_mds_client *mdsc,
1284 struct ceph_mds_session *session)
1285{
1286 struct ceph_msg *msg;
1287 int mstate;
1288 int mds = session->s_mds;
1289
1290
1291 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1292 dout("open_session to mds%d (%s)\n", mds,
1293 ceph_mds_state_name(mstate));
1294 session->s_state = CEPH_MDS_SESSION_OPENING;
1295 session->s_renew_requested = jiffies;
1296
1297
1298 msg = create_session_open_msg(mdsc, session->s_seq);
1299 if (!msg)
1300 return -ENOMEM;
1301 ceph_con_send(&session->s_con, msg);
1302 return 0;
1303}
1304
1305
1306
1307
1308
1309
1310static struct ceph_mds_session *
1311__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1312{
1313 struct ceph_mds_session *session;
1314
1315 session = __ceph_lookup_mds_session(mdsc, target);
1316 if (!session) {
1317 session = register_session(mdsc, target);
1318 if (IS_ERR(session))
1319 return session;
1320 }
1321 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1322 session->s_state == CEPH_MDS_SESSION_CLOSING)
1323 __open_session(mdsc, session);
1324
1325 return session;
1326}
1327
1328struct ceph_mds_session *
1329ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1330{
1331 struct ceph_mds_session *session;
1332
1333 dout("open_export_target_session to mds%d\n", target);
1334
1335 mutex_lock(&mdsc->mutex);
1336 session = __open_export_target_session(mdsc, target);
1337 mutex_unlock(&mdsc->mutex);
1338
1339 return session;
1340}
1341
1342static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1343 struct ceph_mds_session *session)
1344{
1345 struct ceph_mds_info *mi;
1346 struct ceph_mds_session *ts;
1347 int i, mds = session->s_mds;
1348
1349 if (mds >= mdsc->mdsmap->possible_max_rank)
1350 return;
1351
1352 mi = &mdsc->mdsmap->m_info[mds];
1353 dout("open_export_target_sessions for mds%d (%d targets)\n",
1354 session->s_mds, mi->num_export_targets);
1355
1356 for (i = 0; i < mi->num_export_targets; i++) {
1357 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1358 if (!IS_ERR(ts))
1359 ceph_put_mds_session(ts);
1360 }
1361}
1362
1363void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1364 struct ceph_mds_session *session)
1365{
1366 mutex_lock(&mdsc->mutex);
1367 __open_export_target_sessions(mdsc, session);
1368 mutex_unlock(&mdsc->mutex);
1369}
1370
1371
1372
1373
1374
1375static void detach_cap_releases(struct ceph_mds_session *session,
1376 struct list_head *target)
1377{
1378 lockdep_assert_held(&session->s_cap_lock);
1379
1380 list_splice_init(&session->s_cap_releases, target);
1381 session->s_num_cap_releases = 0;
1382 dout("dispose_cap_releases mds%d\n", session->s_mds);
1383}
1384
1385static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1386 struct list_head *dispose)
1387{
1388 while (!list_empty(dispose)) {
1389 struct ceph_cap *cap;
1390
1391 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1392 list_del(&cap->session_caps);
1393 ceph_put_cap(mdsc, cap);
1394 }
1395}
1396
1397static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1398 struct ceph_mds_session *session)
1399{
1400 struct ceph_mds_request *req;
1401 struct rb_node *p;
1402 struct ceph_inode_info *ci;
1403
1404 dout("cleanup_session_requests mds%d\n", session->s_mds);
1405 mutex_lock(&mdsc->mutex);
1406 while (!list_empty(&session->s_unsafe)) {
1407 req = list_first_entry(&session->s_unsafe,
1408 struct ceph_mds_request, r_unsafe_item);
1409 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1410 req->r_tid);
1411 if (req->r_target_inode) {
1412
1413 ci = ceph_inode(req->r_target_inode);
1414 errseq_set(&ci->i_meta_err, -EIO);
1415 }
1416 if (req->r_unsafe_dir) {
1417
1418 ci = ceph_inode(req->r_unsafe_dir);
1419 errseq_set(&ci->i_meta_err, -EIO);
1420 }
1421 __unregister_request(mdsc, req);
1422 }
1423
1424 p = rb_first(&mdsc->request_tree);
1425 while (p) {
1426 req = rb_entry(p, struct ceph_mds_request, r_node);
1427 p = rb_next(p);
1428 if (req->r_session &&
1429 req->r_session->s_mds == session->s_mds)
1430 req->r_attempts = 0;
1431 }
1432 mutex_unlock(&mdsc->mutex);
1433}
1434
1435
1436
1437
1438
1439
1440
1441int ceph_iterate_session_caps(struct ceph_mds_session *session,
1442 int (*cb)(struct inode *, struct ceph_cap *,
1443 void *), void *arg)
1444{
1445 struct list_head *p;
1446 struct ceph_cap *cap;
1447 struct inode *inode, *last_inode = NULL;
1448 struct ceph_cap *old_cap = NULL;
1449 int ret;
1450
1451 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1452 spin_lock(&session->s_cap_lock);
1453 p = session->s_caps.next;
1454 while (p != &session->s_caps) {
1455 cap = list_entry(p, struct ceph_cap, session_caps);
1456 inode = igrab(&cap->ci->vfs_inode);
1457 if (!inode) {
1458 p = p->next;
1459 continue;
1460 }
1461 session->s_cap_iterator = cap;
1462 spin_unlock(&session->s_cap_lock);
1463
1464 if (last_inode) {
1465
1466
1467 ceph_async_iput(last_inode);
1468 last_inode = NULL;
1469 }
1470 if (old_cap) {
1471 ceph_put_cap(session->s_mdsc, old_cap);
1472 old_cap = NULL;
1473 }
1474
1475 ret = cb(inode, cap, arg);
1476 last_inode = inode;
1477
1478 spin_lock(&session->s_cap_lock);
1479 p = p->next;
1480 if (!cap->ci) {
1481 dout("iterate_session_caps finishing cap %p removal\n",
1482 cap);
1483 BUG_ON(cap->session != session);
1484 cap->session = NULL;
1485 list_del_init(&cap->session_caps);
1486 session->s_nr_caps--;
1487 if (cap->queue_release)
1488 __ceph_queue_cap_release(session, cap);
1489 else
1490 old_cap = cap;
1491 }
1492 if (ret < 0)
1493 goto out;
1494 }
1495 ret = 0;
1496out:
1497 session->s_cap_iterator = NULL;
1498 spin_unlock(&session->s_cap_lock);
1499
1500 ceph_async_iput(last_inode);
1501 if (old_cap)
1502 ceph_put_cap(session->s_mdsc, old_cap);
1503
1504 return ret;
1505}
1506
1507static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1508 void *arg)
1509{
1510 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1511 struct ceph_inode_info *ci = ceph_inode(inode);
1512 LIST_HEAD(to_remove);
1513 bool dirty_dropped = false;
1514 bool invalidate = false;
1515
1516 dout("removing cap %p, ci is %p, inode is %p\n",
1517 cap, ci, &ci->vfs_inode);
1518 spin_lock(&ci->i_ceph_lock);
1519 __ceph_remove_cap(cap, false);
1520 if (!ci->i_auth_cap) {
1521 struct ceph_cap_flush *cf;
1522 struct ceph_mds_client *mdsc = fsc->mdsc;
1523
1524 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1525 if (inode->i_data.nrpages > 0)
1526 invalidate = true;
1527 if (ci->i_wrbuffer_ref > 0)
1528 mapping_set_error(&inode->i_data, -EIO);
1529 }
1530
1531 while (!list_empty(&ci->i_cap_flush_list)) {
1532 cf = list_first_entry(&ci->i_cap_flush_list,
1533 struct ceph_cap_flush, i_list);
1534 list_move(&cf->i_list, &to_remove);
1535 }
1536
1537 spin_lock(&mdsc->cap_dirty_lock);
1538
1539 list_for_each_entry(cf, &to_remove, i_list)
1540 list_del(&cf->g_list);
1541
1542 if (!list_empty(&ci->i_dirty_item)) {
1543 pr_warn_ratelimited(
1544 " dropping dirty %s state for %p %lld\n",
1545 ceph_cap_string(ci->i_dirty_caps),
1546 inode, ceph_ino(inode));
1547 ci->i_dirty_caps = 0;
1548 list_del_init(&ci->i_dirty_item);
1549 dirty_dropped = true;
1550 }
1551 if (!list_empty(&ci->i_flushing_item)) {
1552 pr_warn_ratelimited(
1553 " dropping dirty+flushing %s state for %p %lld\n",
1554 ceph_cap_string(ci->i_flushing_caps),
1555 inode, ceph_ino(inode));
1556 ci->i_flushing_caps = 0;
1557 list_del_init(&ci->i_flushing_item);
1558 mdsc->num_cap_flushing--;
1559 dirty_dropped = true;
1560 }
1561 spin_unlock(&mdsc->cap_dirty_lock);
1562
1563 if (dirty_dropped) {
1564 errseq_set(&ci->i_meta_err, -EIO);
1565
1566 if (ci->i_wrbuffer_ref_head == 0 &&
1567 ci->i_wr_ref == 0 &&
1568 ci->i_dirty_caps == 0 &&
1569 ci->i_flushing_caps == 0) {
1570 ceph_put_snap_context(ci->i_head_snapc);
1571 ci->i_head_snapc = NULL;
1572 }
1573 }
1574
1575 if (atomic_read(&ci->i_filelock_ref) > 0) {
1576
1577 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1578 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1579 inode, ceph_ino(inode));
1580 }
1581
1582 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1583 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1584 ci->i_prealloc_cap_flush = NULL;
1585 }
1586 }
1587 spin_unlock(&ci->i_ceph_lock);
1588 while (!list_empty(&to_remove)) {
1589 struct ceph_cap_flush *cf;
1590 cf = list_first_entry(&to_remove,
1591 struct ceph_cap_flush, i_list);
1592 list_del(&cf->i_list);
1593 ceph_free_cap_flush(cf);
1594 }
1595
1596 wake_up_all(&ci->i_cap_wq);
1597 if (invalidate)
1598 ceph_queue_invalidate(inode);
1599 if (dirty_dropped)
1600 iput(inode);
1601 return 0;
1602}
1603
1604
1605
1606
1607static void remove_session_caps(struct ceph_mds_session *session)
1608{
1609 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1610 struct super_block *sb = fsc->sb;
1611 LIST_HEAD(dispose);
1612
1613 dout("remove_session_caps on %p\n", session);
1614 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1615
1616 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1617
1618 spin_lock(&session->s_cap_lock);
1619 if (session->s_nr_caps > 0) {
1620 struct inode *inode;
1621 struct ceph_cap *cap, *prev = NULL;
1622 struct ceph_vino vino;
1623
1624
1625
1626
1627
1628
1629
1630 while (!list_empty(&session->s_caps)) {
1631 cap = list_entry(session->s_caps.next,
1632 struct ceph_cap, session_caps);
1633 if (cap == prev)
1634 break;
1635 prev = cap;
1636 vino = cap->ci->i_vino;
1637 spin_unlock(&session->s_cap_lock);
1638
1639 inode = ceph_find_inode(sb, vino);
1640
1641 ceph_async_iput(inode);
1642
1643 spin_lock(&session->s_cap_lock);
1644 }
1645 }
1646
1647
1648 detach_cap_releases(session, &dispose);
1649
1650 BUG_ON(session->s_nr_caps > 0);
1651 BUG_ON(!list_empty(&session->s_cap_flushing));
1652 spin_unlock(&session->s_cap_lock);
1653 dispose_cap_releases(session->s_mdsc, &dispose);
1654}
1655
1656enum {
1657 RECONNECT,
1658 RENEWCAPS,
1659 FORCE_RO,
1660};
1661
1662
1663
1664
1665
1666
1667
1668static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1669 void *arg)
1670{
1671 struct ceph_inode_info *ci = ceph_inode(inode);
1672 unsigned long ev = (unsigned long)arg;
1673
1674 if (ev == RECONNECT) {
1675 spin_lock(&ci->i_ceph_lock);
1676 ci->i_wanted_max_size = 0;
1677 ci->i_requested_max_size = 0;
1678 spin_unlock(&ci->i_ceph_lock);
1679 } else if (ev == RENEWCAPS) {
1680 if (cap->cap_gen < cap->session->s_cap_gen) {
1681
1682 spin_lock(&ci->i_ceph_lock);
1683 cap->issued = cap->implemented = CEPH_CAP_PIN;
1684 spin_unlock(&ci->i_ceph_lock);
1685 }
1686 } else if (ev == FORCE_RO) {
1687 }
1688 wake_up_all(&ci->i_cap_wq);
1689 return 0;
1690}
1691
1692static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1693{
1694 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1695 ceph_iterate_session_caps(session, wake_up_session_cb,
1696 (void *)(unsigned long)ev);
1697}
1698
1699
1700
1701
1702
1703
1704
1705static int send_renew_caps(struct ceph_mds_client *mdsc,
1706 struct ceph_mds_session *session)
1707{
1708 struct ceph_msg *msg;
1709 int state;
1710
1711 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1712 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1713 pr_info("mds%d caps stale\n", session->s_mds);
1714 session->s_renew_requested = jiffies;
1715
1716
1717
1718 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1719 if (state < CEPH_MDS_STATE_RECONNECT) {
1720 dout("send_renew_caps ignoring mds%d (%s)\n",
1721 session->s_mds, ceph_mds_state_name(state));
1722 return 0;
1723 }
1724
1725 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1726 ceph_mds_state_name(state));
1727 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1728 ++session->s_renew_seq);
1729 if (!msg)
1730 return -ENOMEM;
1731 ceph_con_send(&session->s_con, msg);
1732 return 0;
1733}
1734
1735static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1736 struct ceph_mds_session *session, u64 seq)
1737{
1738 struct ceph_msg *msg;
1739
1740 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1741 session->s_mds, ceph_session_state_name(session->s_state), seq);
1742 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1743 if (!msg)
1744 return -ENOMEM;
1745 ceph_con_send(&session->s_con, msg);
1746 return 0;
1747}
1748
1749
1750
1751
1752
1753
1754
1755static void renewed_caps(struct ceph_mds_client *mdsc,
1756 struct ceph_mds_session *session, int is_renew)
1757{
1758 int was_stale;
1759 int wake = 0;
1760
1761 spin_lock(&session->s_cap_lock);
1762 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1763
1764 session->s_cap_ttl = session->s_renew_requested +
1765 mdsc->mdsmap->m_session_timeout*HZ;
1766
1767 if (was_stale) {
1768 if (time_before(jiffies, session->s_cap_ttl)) {
1769 pr_info("mds%d caps renewed\n", session->s_mds);
1770 wake = 1;
1771 } else {
1772 pr_info("mds%d caps still stale\n", session->s_mds);
1773 }
1774 }
1775 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1776 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1777 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1778 spin_unlock(&session->s_cap_lock);
1779
1780 if (wake)
1781 wake_up_session_caps(session, RENEWCAPS);
1782}
1783
1784
1785
1786
1787static int request_close_session(struct ceph_mds_client *mdsc,
1788 struct ceph_mds_session *session)
1789{
1790 struct ceph_msg *msg;
1791
1792 dout("request_close_session mds%d state %s seq %lld\n",
1793 session->s_mds, ceph_session_state_name(session->s_state),
1794 session->s_seq);
1795 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1796 if (!msg)
1797 return -ENOMEM;
1798 ceph_con_send(&session->s_con, msg);
1799 return 1;
1800}
1801
1802
1803
1804
1805static int __close_session(struct ceph_mds_client *mdsc,
1806 struct ceph_mds_session *session)
1807{
1808 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1809 return 0;
1810 session->s_state = CEPH_MDS_SESSION_CLOSING;
1811 return request_close_session(mdsc, session);
1812}
1813
1814static bool drop_negative_children(struct dentry *dentry)
1815{
1816 struct dentry *child;
1817 bool all_negative = true;
1818
1819 if (!d_is_dir(dentry))
1820 goto out;
1821
1822 spin_lock(&dentry->d_lock);
1823 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1824 if (d_really_is_positive(child)) {
1825 all_negative = false;
1826 break;
1827 }
1828 }
1829 spin_unlock(&dentry->d_lock);
1830
1831 if (all_negative)
1832 shrink_dcache_parent(dentry);
1833out:
1834 return all_negative;
1835}
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1848{
1849 int *remaining = arg;
1850 struct ceph_inode_info *ci = ceph_inode(inode);
1851 int used, wanted, oissued, mine;
1852
1853 if (*remaining <= 0)
1854 return -1;
1855
1856 spin_lock(&ci->i_ceph_lock);
1857 mine = cap->issued | cap->implemented;
1858 used = __ceph_caps_used(ci);
1859 wanted = __ceph_caps_file_wanted(ci);
1860 oissued = __ceph_caps_issued_other(ci, cap);
1861
1862 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1863 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1864 ceph_cap_string(used), ceph_cap_string(wanted));
1865 if (cap == ci->i_auth_cap) {
1866 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1867 !list_empty(&ci->i_cap_snaps))
1868 goto out;
1869 if ((used | wanted) & CEPH_CAP_ANY_WR)
1870 goto out;
1871
1872
1873
1874 if (atomic_read(&ci->i_filelock_ref) > 0)
1875 goto out;
1876 }
1877
1878
1879 if (S_ISREG(inode->i_mode) &&
1880 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1881 !(oissued & CEPH_CAP_FILE_CACHE)) {
1882 used = 0;
1883 oissued = 0;
1884 }
1885 if ((used | wanted) & ~oissued & mine)
1886 goto out;
1887
1888 if (oissued) {
1889
1890 __ceph_remove_cap(cap, true);
1891 (*remaining)--;
1892 } else {
1893 struct dentry *dentry;
1894
1895 spin_unlock(&ci->i_ceph_lock);
1896 dentry = d_find_any_alias(inode);
1897 if (dentry && drop_negative_children(dentry)) {
1898 int count;
1899 dput(dentry);
1900 d_prune_aliases(inode);
1901 count = atomic_read(&inode->i_count);
1902 if (count == 1)
1903 (*remaining)--;
1904 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1905 inode, cap, count);
1906 } else {
1907 dput(dentry);
1908 }
1909 return 0;
1910 }
1911
1912out:
1913 spin_unlock(&ci->i_ceph_lock);
1914 return 0;
1915}
1916
1917
1918
1919
1920int ceph_trim_caps(struct ceph_mds_client *mdsc,
1921 struct ceph_mds_session *session,
1922 int max_caps)
1923{
1924 int trim_caps = session->s_nr_caps - max_caps;
1925
1926 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1927 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1928 if (trim_caps > 0) {
1929 int remaining = trim_caps;
1930
1931 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
1932 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1933 session->s_mds, session->s_nr_caps, max_caps,
1934 trim_caps - remaining);
1935 }
1936
1937 ceph_flush_cap_releases(mdsc, session);
1938 return 0;
1939}
1940
1941static int check_caps_flush(struct ceph_mds_client *mdsc,
1942 u64 want_flush_tid)
1943{
1944 int ret = 1;
1945
1946 spin_lock(&mdsc->cap_dirty_lock);
1947 if (!list_empty(&mdsc->cap_flush_list)) {
1948 struct ceph_cap_flush *cf =
1949 list_first_entry(&mdsc->cap_flush_list,
1950 struct ceph_cap_flush, g_list);
1951 if (cf->tid <= want_flush_tid) {
1952 dout("check_caps_flush still flushing tid "
1953 "%llu <= %llu\n", cf->tid, want_flush_tid);
1954 ret = 0;
1955 }
1956 }
1957 spin_unlock(&mdsc->cap_dirty_lock);
1958 return ret;
1959}
1960
1961
1962
1963
1964
1965
1966static void wait_caps_flush(struct ceph_mds_client *mdsc,
1967 u64 want_flush_tid)
1968{
1969 dout("check_caps_flush want %llu\n", want_flush_tid);
1970
1971 wait_event(mdsc->cap_flushing_wq,
1972 check_caps_flush(mdsc, want_flush_tid));
1973
1974 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1975}
1976
1977
1978
1979
1980static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1981 struct ceph_mds_session *session)
1982{
1983 struct ceph_msg *msg = NULL;
1984 struct ceph_mds_cap_release *head;
1985 struct ceph_mds_cap_item *item;
1986 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1987 struct ceph_cap *cap;
1988 LIST_HEAD(tmp_list);
1989 int num_cap_releases;
1990 __le32 barrier, *cap_barrier;
1991
1992 down_read(&osdc->lock);
1993 barrier = cpu_to_le32(osdc->epoch_barrier);
1994 up_read(&osdc->lock);
1995
1996 spin_lock(&session->s_cap_lock);
1997again:
1998 list_splice_init(&session->s_cap_releases, &tmp_list);
1999 num_cap_releases = session->s_num_cap_releases;
2000 session->s_num_cap_releases = 0;
2001 spin_unlock(&session->s_cap_lock);
2002
2003 while (!list_empty(&tmp_list)) {
2004 if (!msg) {
2005 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2006 PAGE_SIZE, GFP_NOFS, false);
2007 if (!msg)
2008 goto out_err;
2009 head = msg->front.iov_base;
2010 head->num = cpu_to_le32(0);
2011 msg->front.iov_len = sizeof(*head);
2012
2013 msg->hdr.version = cpu_to_le16(2);
2014 msg->hdr.compat_version = cpu_to_le16(1);
2015 }
2016
2017 cap = list_first_entry(&tmp_list, struct ceph_cap,
2018 session_caps);
2019 list_del(&cap->session_caps);
2020 num_cap_releases--;
2021
2022 head = msg->front.iov_base;
2023 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2024 &head->num);
2025 item = msg->front.iov_base + msg->front.iov_len;
2026 item->ino = cpu_to_le64(cap->cap_ino);
2027 item->cap_id = cpu_to_le64(cap->cap_id);
2028 item->migrate_seq = cpu_to_le32(cap->mseq);
2029 item->seq = cpu_to_le32(cap->issue_seq);
2030 msg->front.iov_len += sizeof(*item);
2031
2032 ceph_put_cap(mdsc, cap);
2033
2034 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2035
2036 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2037 *cap_barrier = barrier;
2038 msg->front.iov_len += sizeof(*cap_barrier);
2039
2040 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2041 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2042 ceph_con_send(&session->s_con, msg);
2043 msg = NULL;
2044 }
2045 }
2046
2047 BUG_ON(num_cap_releases != 0);
2048
2049 spin_lock(&session->s_cap_lock);
2050 if (!list_empty(&session->s_cap_releases))
2051 goto again;
2052 spin_unlock(&session->s_cap_lock);
2053
2054 if (msg) {
2055
2056 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2057 *cap_barrier = barrier;
2058 msg->front.iov_len += sizeof(*cap_barrier);
2059
2060 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2061 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2062 ceph_con_send(&session->s_con, msg);
2063 }
2064 return;
2065out_err:
2066 pr_err("send_cap_releases mds%d, failed to allocate message\n",
2067 session->s_mds);
2068 spin_lock(&session->s_cap_lock);
2069 list_splice(&tmp_list, &session->s_cap_releases);
2070 session->s_num_cap_releases += num_cap_releases;
2071 spin_unlock(&session->s_cap_lock);
2072}
2073
2074static void ceph_cap_release_work(struct work_struct *work)
2075{
2076 struct ceph_mds_session *session =
2077 container_of(work, struct ceph_mds_session, s_cap_release_work);
2078
2079 mutex_lock(&session->s_mutex);
2080 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2081 session->s_state == CEPH_MDS_SESSION_HUNG)
2082 ceph_send_cap_releases(session->s_mdsc, session);
2083 mutex_unlock(&session->s_mutex);
2084 ceph_put_mds_session(session);
2085}
2086
2087void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2088 struct ceph_mds_session *session)
2089{
2090 if (mdsc->stopping)
2091 return;
2092
2093 ceph_get_mds_session(session);
2094 if (queue_work(mdsc->fsc->cap_wq,
2095 &session->s_cap_release_work)) {
2096 dout("cap release work queued\n");
2097 } else {
2098 ceph_put_mds_session(session);
2099 dout("failed to queue cap release work\n");
2100 }
2101}
2102
2103
2104
2105
2106void __ceph_queue_cap_release(struct ceph_mds_session *session,
2107 struct ceph_cap *cap)
2108{
2109 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2110 session->s_num_cap_releases++;
2111
2112 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2113 ceph_flush_cap_releases(session->s_mdsc, session);
2114}
2115
2116static void ceph_cap_reclaim_work(struct work_struct *work)
2117{
2118 struct ceph_mds_client *mdsc =
2119 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2120 int ret = ceph_trim_dentries(mdsc);
2121 if (ret == -EAGAIN)
2122 ceph_queue_cap_reclaim_work(mdsc);
2123}
2124
2125void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2126{
2127 if (mdsc->stopping)
2128 return;
2129
2130 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2131 dout("caps reclaim work queued\n");
2132 } else {
2133 dout("failed to queue caps release work\n");
2134 }
2135}
2136
2137void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2138{
2139 int val;
2140 if (!nr)
2141 return;
2142 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2143 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2144 atomic_set(&mdsc->cap_reclaim_pending, 0);
2145 ceph_queue_cap_reclaim_work(mdsc);
2146 }
2147}
2148
2149
2150
2151
2152
2153int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2154 struct inode *dir)
2155{
2156 struct ceph_inode_info *ci = ceph_inode(dir);
2157 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2158 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2159 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2160 int order, num_entries;
2161
2162 spin_lock(&ci->i_ceph_lock);
2163 num_entries = ci->i_files + ci->i_subdirs;
2164 spin_unlock(&ci->i_ceph_lock);
2165 num_entries = max(num_entries, 1);
2166 num_entries = min(num_entries, opt->max_readdir);
2167
2168 order = get_order(size * num_entries);
2169 while (order >= 0) {
2170 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2171 __GFP_NOWARN,
2172 order);
2173 if (rinfo->dir_entries)
2174 break;
2175 order--;
2176 }
2177 if (!rinfo->dir_entries)
2178 return -ENOMEM;
2179
2180 num_entries = (PAGE_SIZE << order) / size;
2181 num_entries = min(num_entries, opt->max_readdir);
2182
2183 rinfo->dir_buf_size = PAGE_SIZE << order;
2184 req->r_num_caps = num_entries + 1;
2185 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2186 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2187 return 0;
2188}
2189
2190
2191
2192
2193struct ceph_mds_request *
2194ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2195{
2196 struct ceph_mds_request *req;
2197
2198 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2199 if (!req)
2200 return ERR_PTR(-ENOMEM);
2201
2202 mutex_init(&req->r_fill_mutex);
2203 req->r_mdsc = mdsc;
2204 req->r_started = jiffies;
2205 req->r_resend_mds = -1;
2206 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2207 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2208 req->r_fmode = -1;
2209 kref_init(&req->r_kref);
2210 RB_CLEAR_NODE(&req->r_node);
2211 INIT_LIST_HEAD(&req->r_wait);
2212 init_completion(&req->r_completion);
2213 init_completion(&req->r_safe_completion);
2214 INIT_LIST_HEAD(&req->r_unsafe_item);
2215
2216 ktime_get_coarse_real_ts64(&req->r_stamp);
2217
2218 req->r_op = op;
2219 req->r_direct_mode = mode;
2220 return req;
2221}
2222
2223
2224
2225
2226
2227
2228static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2229{
2230 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2231 return NULL;
2232 return rb_entry(rb_first(&mdsc->request_tree),
2233 struct ceph_mds_request, r_node);
2234}
2235
2236static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2237{
2238 return mdsc->oldest_tid;
2239}
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2252 int stop_on_nosnap)
2253{
2254 struct dentry *temp;
2255 char *path;
2256 int pos;
2257 unsigned seq;
2258 u64 base;
2259
2260 if (!dentry)
2261 return ERR_PTR(-EINVAL);
2262
2263 path = __getname();
2264 if (!path)
2265 return ERR_PTR(-ENOMEM);
2266retry:
2267 pos = PATH_MAX - 1;
2268 path[pos] = '\0';
2269
2270 seq = read_seqbegin(&rename_lock);
2271 rcu_read_lock();
2272 temp = dentry;
2273 for (;;) {
2274 struct inode *inode;
2275
2276 spin_lock(&temp->d_lock);
2277 inode = d_inode(temp);
2278 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2279 dout("build_path path+%d: %p SNAPDIR\n",
2280 pos, temp);
2281 } else if (stop_on_nosnap && inode && dentry != temp &&
2282 ceph_snap(inode) == CEPH_NOSNAP) {
2283 spin_unlock(&temp->d_lock);
2284 pos++;
2285 break;
2286 } else {
2287 pos -= temp->d_name.len;
2288 if (pos < 0) {
2289 spin_unlock(&temp->d_lock);
2290 break;
2291 }
2292 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2293 }
2294 spin_unlock(&temp->d_lock);
2295 temp = READ_ONCE(temp->d_parent);
2296
2297
2298 if (IS_ROOT(temp))
2299 break;
2300
2301
2302 if (--pos < 0)
2303 break;
2304
2305 path[pos] = '/';
2306 }
2307 base = ceph_ino(d_inode(temp));
2308 rcu_read_unlock();
2309
2310 if (read_seqretry(&rename_lock, seq))
2311 goto retry;
2312
2313 if (pos < 0) {
2314
2315
2316
2317
2318 pr_warn("build_path did not end path lookup where "
2319 "expected, pos is %d\n", pos);
2320 goto retry;
2321 }
2322
2323 *pbase = base;
2324 *plen = PATH_MAX - 1 - pos;
2325 dout("build_path on %p %d built %llx '%.*s'\n",
2326 dentry, d_count(dentry), base, *plen, path + pos);
2327 return path + pos;
2328}
2329
2330static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2331 const char **ppath, int *ppathlen, u64 *pino,
2332 bool *pfreepath, bool parent_locked)
2333{
2334 char *path;
2335
2336 rcu_read_lock();
2337 if (!dir)
2338 dir = d_inode_rcu(dentry->d_parent);
2339 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2340 *pino = ceph_ino(dir);
2341 rcu_read_unlock();
2342 *ppath = dentry->d_name.name;
2343 *ppathlen = dentry->d_name.len;
2344 return 0;
2345 }
2346 rcu_read_unlock();
2347 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2348 if (IS_ERR(path))
2349 return PTR_ERR(path);
2350 *ppath = path;
2351 *pfreepath = true;
2352 return 0;
2353}
2354
2355static int build_inode_path(struct inode *inode,
2356 const char **ppath, int *ppathlen, u64 *pino,
2357 bool *pfreepath)
2358{
2359 struct dentry *dentry;
2360 char *path;
2361
2362 if (ceph_snap(inode) == CEPH_NOSNAP) {
2363 *pino = ceph_ino(inode);
2364 *ppathlen = 0;
2365 return 0;
2366 }
2367 dentry = d_find_alias(inode);
2368 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2369 dput(dentry);
2370 if (IS_ERR(path))
2371 return PTR_ERR(path);
2372 *ppath = path;
2373 *pfreepath = true;
2374 return 0;
2375}
2376
2377
2378
2379
2380
2381static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2382 struct inode *rdiri, const char *rpath,
2383 u64 rino, const char **ppath, int *pathlen,
2384 u64 *ino, bool *freepath, bool parent_locked)
2385{
2386 int r = 0;
2387
2388 if (rinode) {
2389 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2390 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2391 ceph_snap(rinode));
2392 } else if (rdentry) {
2393 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2394 freepath, parent_locked);
2395 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2396 *ppath);
2397 } else if (rpath || rino) {
2398 *ino = rino;
2399 *ppath = rpath;
2400 *pathlen = rpath ? strlen(rpath) : 0;
2401 dout(" path %.*s\n", *pathlen, rpath);
2402 }
2403
2404 return r;
2405}
2406
2407
2408
2409
2410static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2411 struct ceph_mds_request *req,
2412 int mds, bool drop_cap_releases)
2413{
2414 struct ceph_msg *msg;
2415 struct ceph_mds_request_head *head;
2416 const char *path1 = NULL;
2417 const char *path2 = NULL;
2418 u64 ino1 = 0, ino2 = 0;
2419 int pathlen1 = 0, pathlen2 = 0;
2420 bool freepath1 = false, freepath2 = false;
2421 int len;
2422 u16 releases;
2423 void *p, *end;
2424 int ret;
2425
2426 ret = set_request_path_attr(req->r_inode, req->r_dentry,
2427 req->r_parent, req->r_path1, req->r_ino1.ino,
2428 &path1, &pathlen1, &ino1, &freepath1,
2429 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2430 &req->r_req_flags));
2431 if (ret < 0) {
2432 msg = ERR_PTR(ret);
2433 goto out;
2434 }
2435
2436
2437 ret = set_request_path_attr(NULL, req->r_old_dentry,
2438 req->r_old_dentry_dir,
2439 req->r_path2, req->r_ino2.ino,
2440 &path2, &pathlen2, &ino2, &freepath2, true);
2441 if (ret < 0) {
2442 msg = ERR_PTR(ret);
2443 goto out_free1;
2444 }
2445
2446 len = sizeof(*head) +
2447 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2448 sizeof(struct ceph_timespec);
2449
2450
2451 len += sizeof(struct ceph_mds_request_release) *
2452 (!!req->r_inode_drop + !!req->r_dentry_drop +
2453 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2454 if (req->r_dentry_drop)
2455 len += pathlen1;
2456 if (req->r_old_dentry_drop)
2457 len += pathlen2;
2458
2459 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2460 if (!msg) {
2461 msg = ERR_PTR(-ENOMEM);
2462 goto out_free2;
2463 }
2464
2465 msg->hdr.version = cpu_to_le16(2);
2466 msg->hdr.tid = cpu_to_le64(req->r_tid);
2467
2468 head = msg->front.iov_base;
2469 p = msg->front.iov_base + sizeof(*head);
2470 end = msg->front.iov_base + msg->front.iov_len;
2471
2472 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2473 head->op = cpu_to_le32(req->r_op);
2474 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2475 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2476 head->ino = cpu_to_le64(req->r_deleg_ino);
2477 head->args = req->r_args;
2478
2479 ceph_encode_filepath(&p, end, ino1, path1);
2480 ceph_encode_filepath(&p, end, ino2, path2);
2481
2482
2483 req->r_request_release_offset = p - msg->front.iov_base;
2484
2485
2486 releases = 0;
2487 if (req->r_inode_drop)
2488 releases += ceph_encode_inode_release(&p,
2489 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2490 mds, req->r_inode_drop, req->r_inode_unless,
2491 req->r_op == CEPH_MDS_OP_READDIR);
2492 if (req->r_dentry_drop)
2493 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2494 req->r_parent, mds, req->r_dentry_drop,
2495 req->r_dentry_unless);
2496 if (req->r_old_dentry_drop)
2497 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2498 req->r_old_dentry_dir, mds,
2499 req->r_old_dentry_drop,
2500 req->r_old_dentry_unless);
2501 if (req->r_old_inode_drop)
2502 releases += ceph_encode_inode_release(&p,
2503 d_inode(req->r_old_dentry),
2504 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2505
2506 if (drop_cap_releases) {
2507 releases = 0;
2508 p = msg->front.iov_base + req->r_request_release_offset;
2509 }
2510
2511 head->num_releases = cpu_to_le16(releases);
2512
2513
2514 {
2515 struct ceph_timespec ts;
2516 ceph_encode_timespec64(&ts, &req->r_stamp);
2517 ceph_encode_copy(&p, &ts, sizeof(ts));
2518 }
2519
2520 BUG_ON(p > end);
2521 msg->front.iov_len = p - msg->front.iov_base;
2522 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2523
2524 if (req->r_pagelist) {
2525 struct ceph_pagelist *pagelist = req->r_pagelist;
2526 ceph_msg_data_add_pagelist(msg, pagelist);
2527 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2528 } else {
2529 msg->hdr.data_len = 0;
2530 }
2531
2532 msg->hdr.data_off = cpu_to_le16(0);
2533
2534out_free2:
2535 if (freepath2)
2536 ceph_mdsc_free_path((char *)path2, pathlen2);
2537out_free1:
2538 if (freepath1)
2539 ceph_mdsc_free_path((char *)path1, pathlen1);
2540out:
2541 return msg;
2542}
2543
2544
2545
2546
2547
2548static void complete_request(struct ceph_mds_client *mdsc,
2549 struct ceph_mds_request *req)
2550{
2551 if (req->r_callback)
2552 req->r_callback(mdsc, req);
2553 complete_all(&req->r_completion);
2554}
2555
2556
2557
2558
2559static int __prepare_send_request(struct ceph_mds_client *mdsc,
2560 struct ceph_mds_request *req,
2561 int mds, bool drop_cap_releases)
2562{
2563 struct ceph_mds_request_head *rhead;
2564 struct ceph_msg *msg;
2565 int flags = 0;
2566
2567 req->r_attempts++;
2568 if (req->r_inode) {
2569 struct ceph_cap *cap =
2570 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2571
2572 if (cap)
2573 req->r_sent_on_mseq = cap->mseq;
2574 else
2575 req->r_sent_on_mseq = -1;
2576 }
2577 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2578 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2579
2580 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2581 void *p;
2582
2583
2584
2585
2586
2587
2588 msg = req->r_request;
2589 rhead = msg->front.iov_base;
2590
2591 flags = le32_to_cpu(rhead->flags);
2592 flags |= CEPH_MDS_FLAG_REPLAY;
2593 rhead->flags = cpu_to_le32(flags);
2594
2595 if (req->r_target_inode)
2596 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2597
2598 rhead->num_retry = req->r_attempts - 1;
2599
2600
2601 rhead->num_releases = 0;
2602
2603
2604 p = msg->front.iov_base + req->r_request_release_offset;
2605 {
2606 struct ceph_timespec ts;
2607 ceph_encode_timespec64(&ts, &req->r_stamp);
2608 ceph_encode_copy(&p, &ts, sizeof(ts));
2609 }
2610
2611 msg->front.iov_len = p - msg->front.iov_base;
2612 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2613 return 0;
2614 }
2615
2616 if (req->r_request) {
2617 ceph_msg_put(req->r_request);
2618 req->r_request = NULL;
2619 }
2620 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2621 if (IS_ERR(msg)) {
2622 req->r_err = PTR_ERR(msg);
2623 return PTR_ERR(msg);
2624 }
2625 req->r_request = msg;
2626
2627 rhead = msg->front.iov_base;
2628 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2629 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2630 flags |= CEPH_MDS_FLAG_REPLAY;
2631 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2632 flags |= CEPH_MDS_FLAG_ASYNC;
2633 if (req->r_parent)
2634 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2635 rhead->flags = cpu_to_le32(flags);
2636 rhead->num_fwd = req->r_num_fwd;
2637 rhead->num_retry = req->r_attempts - 1;
2638
2639 dout(" r_parent = %p\n", req->r_parent);
2640 return 0;
2641}
2642
2643
2644
2645
2646static int __send_request(struct ceph_mds_client *mdsc,
2647 struct ceph_mds_session *session,
2648 struct ceph_mds_request *req,
2649 bool drop_cap_releases)
2650{
2651 int err;
2652
2653 err = __prepare_send_request(mdsc, req, session->s_mds,
2654 drop_cap_releases);
2655 if (!err) {
2656 ceph_msg_get(req->r_request);
2657 ceph_con_send(&session->s_con, req->r_request);
2658 }
2659
2660 return err;
2661}
2662
2663
2664
2665
2666static void __do_request(struct ceph_mds_client *mdsc,
2667 struct ceph_mds_request *req)
2668{
2669 struct ceph_mds_session *session = NULL;
2670 int mds = -1;
2671 int err = 0;
2672 bool random;
2673
2674 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2675 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2676 __unregister_request(mdsc, req);
2677 return;
2678 }
2679
2680 if (req->r_timeout &&
2681 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2682 dout("do_request timed out\n");
2683 err = -ETIMEDOUT;
2684 goto finish;
2685 }
2686 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2687 dout("do_request forced umount\n");
2688 err = -EIO;
2689 goto finish;
2690 }
2691 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2692 if (mdsc->mdsmap_err) {
2693 err = mdsc->mdsmap_err;
2694 dout("do_request mdsmap err %d\n", err);
2695 goto finish;
2696 }
2697 if (mdsc->mdsmap->m_epoch == 0) {
2698 dout("do_request no mdsmap, waiting for map\n");
2699 list_add(&req->r_wait, &mdsc->waiting_for_map);
2700 return;
2701 }
2702 if (!(mdsc->fsc->mount_options->flags &
2703 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2704 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2705 err = -EHOSTUNREACH;
2706 goto finish;
2707 }
2708 }
2709
2710 put_request_session(req);
2711
2712 mds = __choose_mds(mdsc, req, &random);
2713 if (mds < 0 ||
2714 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2715 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2716 err = -EJUKEBOX;
2717 goto finish;
2718 }
2719 dout("do_request no mds or not active, waiting for map\n");
2720 list_add(&req->r_wait, &mdsc->waiting_for_map);
2721 return;
2722 }
2723
2724
2725 session = __ceph_lookup_mds_session(mdsc, mds);
2726 if (!session) {
2727 session = register_session(mdsc, mds);
2728 if (IS_ERR(session)) {
2729 err = PTR_ERR(session);
2730 goto finish;
2731 }
2732 }
2733 req->r_session = ceph_get_mds_session(session);
2734
2735 dout("do_request mds%d session %p state %s\n", mds, session,
2736 ceph_session_state_name(session->s_state));
2737 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2738 session->s_state != CEPH_MDS_SESSION_HUNG) {
2739 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2740 err = -EACCES;
2741 goto out_session;
2742 }
2743
2744
2745
2746
2747
2748 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2749 err = -EJUKEBOX;
2750 goto out_session;
2751 }
2752 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2753 session->s_state == CEPH_MDS_SESSION_CLOSING) {
2754 __open_session(mdsc, session);
2755
2756 if (random)
2757 req->r_resend_mds = mds;
2758 }
2759 list_add(&req->r_wait, &session->s_waiting);
2760 goto out_session;
2761 }
2762
2763
2764 req->r_resend_mds = -1;
2765
2766 if (req->r_request_started == 0)
2767 req->r_request_started = jiffies;
2768
2769 err = __send_request(mdsc, session, req, false);
2770
2771out_session:
2772 ceph_put_mds_session(session);
2773finish:
2774 if (err) {
2775 dout("__do_request early error %d\n", err);
2776 req->r_err = err;
2777 complete_request(mdsc, req);
2778 __unregister_request(mdsc, req);
2779 }
2780 return;
2781}
2782
2783
2784
2785
2786static void __wake_requests(struct ceph_mds_client *mdsc,
2787 struct list_head *head)
2788{
2789 struct ceph_mds_request *req;
2790 LIST_HEAD(tmp_list);
2791
2792 list_splice_init(head, &tmp_list);
2793
2794 while (!list_empty(&tmp_list)) {
2795 req = list_entry(tmp_list.next,
2796 struct ceph_mds_request, r_wait);
2797 list_del_init(&req->r_wait);
2798 dout(" wake request %p tid %llu\n", req, req->r_tid);
2799 __do_request(mdsc, req);
2800 }
2801}
2802
2803
2804
2805
2806
2807static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2808{
2809 struct ceph_mds_request *req;
2810 struct rb_node *p = rb_first(&mdsc->request_tree);
2811
2812 dout("kick_requests mds%d\n", mds);
2813 while (p) {
2814 req = rb_entry(p, struct ceph_mds_request, r_node);
2815 p = rb_next(p);
2816 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2817 continue;
2818 if (req->r_attempts > 0)
2819 continue;
2820 if (req->r_session &&
2821 req->r_session->s_mds == mds) {
2822 dout(" kicking tid %llu\n", req->r_tid);
2823 list_del_init(&req->r_wait);
2824 __do_request(mdsc, req);
2825 }
2826 }
2827}
2828
2829int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2830 struct ceph_mds_request *req)
2831{
2832 int err = 0;
2833
2834
2835 if (req->r_inode)
2836 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2837 if (req->r_parent) {
2838 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2839 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2840 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2841 spin_lock(&ci->i_ceph_lock);
2842 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2843 __ceph_touch_fmode(ci, mdsc, fmode);
2844 spin_unlock(&ci->i_ceph_lock);
2845 ihold(req->r_parent);
2846 }
2847 if (req->r_old_dentry_dir)
2848 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2849 CEPH_CAP_PIN);
2850
2851 if (req->r_inode) {
2852 err = ceph_wait_on_async_create(req->r_inode);
2853 if (err) {
2854 dout("%s: wait for async create returned: %d\n",
2855 __func__, err);
2856 return err;
2857 }
2858 }
2859
2860 if (!err && req->r_old_inode) {
2861 err = ceph_wait_on_async_create(req->r_old_inode);
2862 if (err) {
2863 dout("%s: wait for async create returned: %d\n",
2864 __func__, err);
2865 return err;
2866 }
2867 }
2868
2869 dout("submit_request on %p for inode %p\n", req, dir);
2870 mutex_lock(&mdsc->mutex);
2871 __register_request(mdsc, req, dir);
2872 __do_request(mdsc, req);
2873 err = req->r_err;
2874 mutex_unlock(&mdsc->mutex);
2875 return err;
2876}
2877
2878static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2879 struct ceph_mds_request *req)
2880{
2881 int err;
2882
2883
2884 dout("do_request waiting\n");
2885 if (!req->r_timeout && req->r_wait_for_completion) {
2886 err = req->r_wait_for_completion(mdsc, req);
2887 } else {
2888 long timeleft = wait_for_completion_killable_timeout(
2889 &req->r_completion,
2890 ceph_timeout_jiffies(req->r_timeout));
2891 if (timeleft > 0)
2892 err = 0;
2893 else if (!timeleft)
2894 err = -ETIMEDOUT;
2895 else
2896 err = timeleft;
2897 }
2898 dout("do_request waited, got %d\n", err);
2899 mutex_lock(&mdsc->mutex);
2900
2901
2902 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2903 err = le32_to_cpu(req->r_reply_info.head->result);
2904 } else if (err < 0) {
2905 dout("aborted request %lld with %d\n", req->r_tid, err);
2906
2907
2908
2909
2910
2911
2912 mutex_lock(&req->r_fill_mutex);
2913 req->r_err = err;
2914 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2915 mutex_unlock(&req->r_fill_mutex);
2916
2917 if (req->r_parent &&
2918 (req->r_op & CEPH_MDS_OP_WRITE))
2919 ceph_invalidate_dir_request(req);
2920 } else {
2921 err = req->r_err;
2922 }
2923
2924 mutex_unlock(&mdsc->mutex);
2925 return err;
2926}
2927
2928
2929
2930
2931
2932int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2933 struct inode *dir,
2934 struct ceph_mds_request *req)
2935{
2936 int err;
2937
2938 dout("do_request on %p\n", req);
2939
2940
2941 err = ceph_mdsc_submit_request(mdsc, dir, req);
2942 if (!err)
2943 err = ceph_mdsc_wait_request(mdsc, req);
2944 dout("do_request %p done, result %d\n", req, err);
2945 return err;
2946}
2947
2948
2949
2950
2951
2952void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2953{
2954 struct inode *dir = req->r_parent;
2955 struct inode *old_dir = req->r_old_dentry_dir;
2956
2957 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2958
2959 ceph_dir_clear_complete(dir);
2960 if (old_dir)
2961 ceph_dir_clear_complete(old_dir);
2962 if (req->r_dentry)
2963 ceph_invalidate_dentry_lease(req->r_dentry);
2964 if (req->r_old_dentry)
2965 ceph_invalidate_dentry_lease(req->r_old_dentry);
2966}
2967
2968
2969
2970
2971
2972
2973
2974
2975static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2976{
2977 struct ceph_mds_client *mdsc = session->s_mdsc;
2978 struct ceph_mds_request *req;
2979 struct ceph_mds_reply_head *head = msg->front.iov_base;
2980 struct ceph_mds_reply_info_parsed *rinfo;
2981 struct ceph_snap_realm *realm;
2982 u64 tid;
2983 int err, result;
2984 int mds = session->s_mds;
2985
2986 if (msg->front.iov_len < sizeof(*head)) {
2987 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2988 ceph_msg_dump(msg);
2989 return;
2990 }
2991
2992
2993 tid = le64_to_cpu(msg->hdr.tid);
2994 mutex_lock(&mdsc->mutex);
2995 req = lookup_get_request(mdsc, tid);
2996 if (!req) {
2997 dout("handle_reply on unknown tid %llu\n", tid);
2998 mutex_unlock(&mdsc->mutex);
2999 return;
3000 }
3001 dout("handle_reply %p\n", req);
3002
3003
3004 if (req->r_session != session) {
3005 pr_err("mdsc_handle_reply got %llu on session mds%d"
3006 " not mds%d\n", tid, session->s_mds,
3007 req->r_session ? req->r_session->s_mds : -1);
3008 mutex_unlock(&mdsc->mutex);
3009 goto out;
3010 }
3011
3012
3013 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3014 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3015 pr_warn("got a dup %s reply on %llu from mds%d\n",
3016 head->safe ? "safe" : "unsafe", tid, mds);
3017 mutex_unlock(&mdsc->mutex);
3018 goto out;
3019 }
3020 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3021 pr_warn("got unsafe after safe on %llu from mds%d\n",
3022 tid, mds);
3023 mutex_unlock(&mdsc->mutex);
3024 goto out;
3025 }
3026
3027 result = le32_to_cpu(head->result);
3028
3029
3030
3031
3032
3033
3034
3035
3036 if (result == -ESTALE) {
3037 dout("got ESTALE on request %llu\n", req->r_tid);
3038 req->r_resend_mds = -1;
3039 if (req->r_direct_mode != USE_AUTH_MDS) {
3040 dout("not using auth, setting for that now\n");
3041 req->r_direct_mode = USE_AUTH_MDS;
3042 __do_request(mdsc, req);
3043 mutex_unlock(&mdsc->mutex);
3044 goto out;
3045 } else {
3046 int mds = __choose_mds(mdsc, req, NULL);
3047 if (mds >= 0 && mds != req->r_session->s_mds) {
3048 dout("but auth changed, so resending\n");
3049 __do_request(mdsc, req);
3050 mutex_unlock(&mdsc->mutex);
3051 goto out;
3052 }
3053 }
3054 dout("have to return ESTALE on request %llu\n", req->r_tid);
3055 }
3056
3057
3058 if (head->safe) {
3059 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3060 __unregister_request(mdsc, req);
3061
3062
3063 if (mdsc->stopping && !__get_oldest_req(mdsc))
3064 complete_all(&mdsc->safe_umount_waiters);
3065
3066 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3067
3068
3069
3070
3071
3072
3073
3074 dout("got safe reply %llu, mds%d\n", tid, mds);
3075
3076 mutex_unlock(&mdsc->mutex);
3077 goto out;
3078 }
3079 } else {
3080 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3081 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3082 }
3083
3084 dout("handle_reply tid %lld result %d\n", tid, result);
3085 rinfo = &req->r_reply_info;
3086 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3087 err = parse_reply_info(session, msg, rinfo, (u64)-1);
3088 else
3089 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3090 mutex_unlock(&mdsc->mutex);
3091
3092 mutex_lock(&session->s_mutex);
3093 if (err < 0) {
3094 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3095 ceph_msg_dump(msg);
3096 goto out_err;
3097 }
3098
3099
3100 realm = NULL;
3101 if (rinfo->snapblob_len) {
3102 down_write(&mdsc->snap_rwsem);
3103 ceph_update_snap_trace(mdsc, rinfo->snapblob,
3104 rinfo->snapblob + rinfo->snapblob_len,
3105 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3106 &realm);
3107 downgrade_write(&mdsc->snap_rwsem);
3108 } else {
3109 down_read(&mdsc->snap_rwsem);
3110 }
3111
3112
3113 mutex_lock(&req->r_fill_mutex);
3114 current->journal_info = req;
3115 err = ceph_fill_trace(mdsc->fsc->sb, req);
3116 if (err == 0) {
3117 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3118 req->r_op == CEPH_MDS_OP_LSSNAP))
3119 ceph_readdir_prepopulate(req, req->r_session);
3120 }
3121 current->journal_info = NULL;
3122 mutex_unlock(&req->r_fill_mutex);
3123
3124 up_read(&mdsc->snap_rwsem);
3125 if (realm)
3126 ceph_put_snap_realm(mdsc, realm);
3127
3128 if (err == 0) {
3129 if (req->r_target_inode &&
3130 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3131 struct ceph_inode_info *ci =
3132 ceph_inode(req->r_target_inode);
3133 spin_lock(&ci->i_unsafe_lock);
3134 list_add_tail(&req->r_unsafe_target_item,
3135 &ci->i_unsafe_iops);
3136 spin_unlock(&ci->i_unsafe_lock);
3137 }
3138
3139 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3140 }
3141out_err:
3142 mutex_lock(&mdsc->mutex);
3143 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3144 if (err) {
3145 req->r_err = err;
3146 } else {
3147 req->r_reply = ceph_msg_get(msg);
3148 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3149 }
3150 } else {
3151 dout("reply arrived after request %lld was aborted\n", tid);
3152 }
3153 mutex_unlock(&mdsc->mutex);
3154
3155 mutex_unlock(&session->s_mutex);
3156
3157
3158 complete_request(mdsc, req);
3159out:
3160 ceph_mdsc_put_request(req);
3161 return;
3162}
3163
3164
3165
3166
3167
3168
3169static void handle_forward(struct ceph_mds_client *mdsc,
3170 struct ceph_mds_session *session,
3171 struct ceph_msg *msg)
3172{
3173 struct ceph_mds_request *req;
3174 u64 tid = le64_to_cpu(msg->hdr.tid);
3175 u32 next_mds;
3176 u32 fwd_seq;
3177 int err = -EINVAL;
3178 void *p = msg->front.iov_base;
3179 void *end = p + msg->front.iov_len;
3180
3181 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3182 next_mds = ceph_decode_32(&p);
3183 fwd_seq = ceph_decode_32(&p);
3184
3185 mutex_lock(&mdsc->mutex);
3186 req = lookup_get_request(mdsc, tid);
3187 if (!req) {
3188 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3189 goto out;
3190 }
3191
3192 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3193 dout("forward tid %llu aborted, unregistering\n", tid);
3194 __unregister_request(mdsc, req);
3195 } else if (fwd_seq <= req->r_num_fwd) {
3196 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3197 tid, next_mds, req->r_num_fwd, fwd_seq);
3198 } else {
3199
3200 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3201 BUG_ON(req->r_err);
3202 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3203 req->r_attempts = 0;
3204 req->r_num_fwd = fwd_seq;
3205 req->r_resend_mds = next_mds;
3206 put_request_session(req);
3207 __do_request(mdsc, req);
3208 }
3209 ceph_mdsc_put_request(req);
3210out:
3211 mutex_unlock(&mdsc->mutex);
3212 return;
3213
3214bad:
3215 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3216}
3217
3218static int __decode_session_metadata(void **p, void *end,
3219 bool *blacklisted)
3220{
3221
3222 u32 n;
3223 bool err_str;
3224 ceph_decode_32_safe(p, end, n, bad);
3225 while (n-- > 0) {
3226 u32 len;
3227 ceph_decode_32_safe(p, end, len, bad);
3228 ceph_decode_need(p, end, len, bad);
3229 err_str = !strncmp(*p, "error_string", len);
3230 *p += len;
3231 ceph_decode_32_safe(p, end, len, bad);
3232 ceph_decode_need(p, end, len, bad);
3233 if (err_str && strnstr(*p, "blacklisted", len))
3234 *blacklisted = true;
3235 *p += len;
3236 }
3237 return 0;
3238bad:
3239 return -1;
3240}
3241
3242
3243
3244
3245static void handle_session(struct ceph_mds_session *session,
3246 struct ceph_msg *msg)
3247{
3248 struct ceph_mds_client *mdsc = session->s_mdsc;
3249 int mds = session->s_mds;
3250 int msg_version = le16_to_cpu(msg->hdr.version);
3251 void *p = msg->front.iov_base;
3252 void *end = p + msg->front.iov_len;
3253 struct ceph_mds_session_head *h;
3254 u32 op;
3255 u64 seq, features = 0;
3256 int wake = 0;
3257 bool blacklisted = false;
3258
3259
3260 ceph_decode_need(&p, end, sizeof(*h), bad);
3261 h = p;
3262 p += sizeof(*h);
3263
3264 op = le32_to_cpu(h->op);
3265 seq = le64_to_cpu(h->seq);
3266
3267 if (msg_version >= 3) {
3268 u32 len;
3269
3270 if (__decode_session_metadata(&p, end, &blacklisted) < 0)
3271 goto bad;
3272
3273 ceph_decode_32_safe(&p, end, len, bad);
3274 if (len) {
3275 ceph_decode_64_safe(&p, end, features, bad);
3276 p += len - sizeof(features);
3277 }
3278 }
3279
3280 mutex_lock(&mdsc->mutex);
3281 if (op == CEPH_SESSION_CLOSE) {
3282 ceph_get_mds_session(session);
3283 __unregister_session(mdsc, session);
3284 }
3285
3286 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3287 mutex_unlock(&mdsc->mutex);
3288
3289 mutex_lock(&session->s_mutex);
3290
3291 dout("handle_session mds%d %s %p state %s seq %llu\n",
3292 mds, ceph_session_op_name(op), session,
3293 ceph_session_state_name(session->s_state), seq);
3294
3295 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3296 session->s_state = CEPH_MDS_SESSION_OPEN;
3297 pr_info("mds%d came back\n", session->s_mds);
3298 }
3299
3300 switch (op) {
3301 case CEPH_SESSION_OPEN:
3302 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3303 pr_info("mds%d reconnect success\n", session->s_mds);
3304 session->s_state = CEPH_MDS_SESSION_OPEN;
3305 session->s_features = features;
3306 renewed_caps(mdsc, session, 0);
3307 wake = 1;
3308 if (mdsc->stopping)
3309 __close_session(mdsc, session);
3310 break;
3311
3312 case CEPH_SESSION_RENEWCAPS:
3313 if (session->s_renew_seq == seq)
3314 renewed_caps(mdsc, session, 1);
3315 break;
3316
3317 case CEPH_SESSION_CLOSE:
3318 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3319 pr_info("mds%d reconnect denied\n", session->s_mds);
3320 session->s_state = CEPH_MDS_SESSION_CLOSED;
3321 cleanup_session_requests(mdsc, session);
3322 remove_session_caps(session);
3323 wake = 2;
3324 wake_up_all(&mdsc->session_close_wq);
3325 break;
3326
3327 case CEPH_SESSION_STALE:
3328 pr_info("mds%d caps went stale, renewing\n",
3329 session->s_mds);
3330 spin_lock(&session->s_gen_ttl_lock);
3331 session->s_cap_gen++;
3332 session->s_cap_ttl = jiffies - 1;
3333 spin_unlock(&session->s_gen_ttl_lock);
3334 send_renew_caps(mdsc, session);
3335 break;
3336
3337 case CEPH_SESSION_RECALL_STATE:
3338 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3339 break;
3340
3341 case CEPH_SESSION_FLUSHMSG:
3342 send_flushmsg_ack(mdsc, session, seq);
3343 break;
3344
3345 case CEPH_SESSION_FORCE_RO:
3346 dout("force_session_readonly %p\n", session);
3347 spin_lock(&session->s_cap_lock);
3348 session->s_readonly = true;
3349 spin_unlock(&session->s_cap_lock);
3350 wake_up_session_caps(session, FORCE_RO);
3351 break;
3352
3353 case CEPH_SESSION_REJECT:
3354 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3355 pr_info("mds%d rejected session\n", session->s_mds);
3356 session->s_state = CEPH_MDS_SESSION_REJECTED;
3357 cleanup_session_requests(mdsc, session);
3358 remove_session_caps(session);
3359 if (blacklisted)
3360 mdsc->fsc->blacklisted = true;
3361 wake = 2;
3362 break;
3363
3364 default:
3365 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3366 WARN_ON(1);
3367 }
3368
3369 mutex_unlock(&session->s_mutex);
3370 if (wake) {
3371 mutex_lock(&mdsc->mutex);
3372 __wake_requests(mdsc, &session->s_waiting);
3373 if (wake == 2)
3374 kick_requests(mdsc, mds);
3375 mutex_unlock(&mdsc->mutex);
3376 }
3377 if (op == CEPH_SESSION_CLOSE)
3378 ceph_put_mds_session(session);
3379 return;
3380
3381bad:
3382 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3383 (int)msg->front.iov_len);
3384 ceph_msg_dump(msg);
3385 return;
3386}
3387
3388void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3389{
3390 int dcaps;
3391
3392 dcaps = xchg(&req->r_dir_caps, 0);
3393 if (dcaps) {
3394 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3395 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3396 }
3397}
3398
3399void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3400{
3401 int dcaps;
3402
3403 dcaps = xchg(&req->r_dir_caps, 0);
3404 if (dcaps) {
3405 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3406 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3407 dcaps);
3408 }
3409}
3410
3411
3412
3413
3414static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3415 struct ceph_mds_session *session)
3416{
3417 struct ceph_mds_request *req, *nreq;
3418 struct rb_node *p;
3419
3420 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3421
3422 mutex_lock(&mdsc->mutex);
3423 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3424 __send_request(mdsc, session, req, true);
3425
3426
3427
3428
3429
3430 p = rb_first(&mdsc->request_tree);
3431 while (p) {
3432 req = rb_entry(p, struct ceph_mds_request, r_node);
3433 p = rb_next(p);
3434 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3435 continue;
3436 if (req->r_attempts == 0)
3437 continue;
3438 if (!req->r_session)
3439 continue;
3440 if (req->r_session->s_mds != session->s_mds)
3441 continue;
3442
3443 ceph_mdsc_release_dir_caps_no_check(req);
3444
3445 __send_request(mdsc, session, req, true);
3446 }
3447 mutex_unlock(&mdsc->mutex);
3448}
3449
3450static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3451{
3452 struct ceph_msg *reply;
3453 struct ceph_pagelist *_pagelist;
3454 struct page *page;
3455 __le32 *addr;
3456 int err = -ENOMEM;
3457
3458 if (!recon_state->allow_multi)
3459 return -ENOSPC;
3460
3461
3462 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3463
3464
3465 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3466 if (!_pagelist)
3467 return -ENOMEM;
3468
3469 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3470 if (!reply)
3471 goto fail_msg;
3472
3473
3474 err = ceph_pagelist_encode_32(_pagelist, 0);
3475 if (err < 0)
3476 goto fail;
3477
3478 if (recon_state->nr_caps) {
3479
3480 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3481 if (err)
3482 goto fail;
3483 } else {
3484
3485 err = ceph_pagelist_encode_32(_pagelist, 0);
3486 if (err < 0)
3487 goto fail;
3488 }
3489
3490 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3491 if (err)
3492 goto fail;
3493
3494 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3495 addr = kmap_atomic(page);
3496 if (recon_state->nr_caps) {
3497
3498 *addr = cpu_to_le32(recon_state->nr_caps);
3499 } else {
3500
3501 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3502 }
3503 kunmap_atomic(addr);
3504
3505 reply->hdr.version = cpu_to_le16(5);
3506 reply->hdr.compat_version = cpu_to_le16(4);
3507
3508 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3509 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3510
3511 ceph_con_send(&recon_state->session->s_con, reply);
3512 ceph_pagelist_release(recon_state->pagelist);
3513
3514 recon_state->pagelist = _pagelist;
3515 recon_state->nr_caps = 0;
3516 recon_state->nr_realms = 0;
3517 recon_state->msg_version = 5;
3518 return 0;
3519fail:
3520 ceph_msg_put(reply);
3521fail_msg:
3522 ceph_pagelist_release(_pagelist);
3523 return err;
3524}
3525
3526
3527
3528
3529static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3530 void *arg)
3531{
3532 union {
3533 struct ceph_mds_cap_reconnect v2;
3534 struct ceph_mds_cap_reconnect_v1 v1;
3535 } rec;
3536 struct ceph_inode_info *ci = cap->ci;
3537 struct ceph_reconnect_state *recon_state = arg;
3538 struct ceph_pagelist *pagelist = recon_state->pagelist;
3539 int err;
3540 u64 snap_follows;
3541
3542 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3543 inode, ceph_vinop(inode), cap, cap->cap_id,
3544 ceph_cap_string(cap->issued));
3545
3546 spin_lock(&ci->i_ceph_lock);
3547 cap->seq = 0;
3548 cap->issue_seq = 0;
3549 cap->mseq = 0;
3550 cap->cap_gen = cap->session->s_cap_gen;
3551
3552
3553 if (S_ISDIR(inode->i_mode)) {
3554 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3555 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3556 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3557 }
3558 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3559 }
3560
3561 if (recon_state->msg_version >= 2) {
3562 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3563 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3564 rec.v2.issued = cpu_to_le32(cap->issued);
3565 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3566 rec.v2.pathbase = 0;
3567 rec.v2.flock_len = (__force __le32)
3568 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3569 } else {
3570 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3571 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3572 rec.v1.issued = cpu_to_le32(cap->issued);
3573 rec.v1.size = cpu_to_le64(inode->i_size);
3574 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3575 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3576 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3577 rec.v1.pathbase = 0;
3578 }
3579
3580 if (list_empty(&ci->i_cap_snaps)) {
3581 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3582 } else {
3583 struct ceph_cap_snap *capsnap =
3584 list_first_entry(&ci->i_cap_snaps,
3585 struct ceph_cap_snap, ci_item);
3586 snap_follows = capsnap->follows;
3587 }
3588 spin_unlock(&ci->i_ceph_lock);
3589
3590 if (recon_state->msg_version >= 2) {
3591 int num_fcntl_locks, num_flock_locks;
3592 struct ceph_filelock *flocks = NULL;
3593 size_t struct_len, total_len = sizeof(u64);
3594 u8 struct_v = 0;
3595
3596encode_again:
3597 if (rec.v2.flock_len) {
3598 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3599 } else {
3600 num_fcntl_locks = 0;
3601 num_flock_locks = 0;
3602 }
3603 if (num_fcntl_locks + num_flock_locks > 0) {
3604 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3605 sizeof(struct ceph_filelock),
3606 GFP_NOFS);
3607 if (!flocks) {
3608 err = -ENOMEM;
3609 goto out_err;
3610 }
3611 err = ceph_encode_locks_to_buffer(inode, flocks,
3612 num_fcntl_locks,
3613 num_flock_locks);
3614 if (err) {
3615 kfree(flocks);
3616 flocks = NULL;
3617 if (err == -ENOSPC)
3618 goto encode_again;
3619 goto out_err;
3620 }
3621 } else {
3622 kfree(flocks);
3623 flocks = NULL;
3624 }
3625
3626 if (recon_state->msg_version >= 3) {
3627
3628 total_len += 2 * sizeof(u8) + sizeof(u32);
3629 struct_v = 2;
3630 }
3631
3632
3633
3634 struct_len = 2 * sizeof(u32) +
3635 (num_fcntl_locks + num_flock_locks) *
3636 sizeof(struct ceph_filelock);
3637 rec.v2.flock_len = cpu_to_le32(struct_len);
3638
3639 struct_len += sizeof(u32) + sizeof(rec.v2);
3640
3641 if (struct_v >= 2)
3642 struct_len += sizeof(u64);
3643
3644 total_len += struct_len;
3645
3646 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3647 err = send_reconnect_partial(recon_state);
3648 if (err)
3649 goto out_freeflocks;
3650 pagelist = recon_state->pagelist;
3651 }
3652
3653 err = ceph_pagelist_reserve(pagelist, total_len);
3654 if (err)
3655 goto out_freeflocks;
3656
3657 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3658 if (recon_state->msg_version >= 3) {
3659 ceph_pagelist_encode_8(pagelist, struct_v);
3660 ceph_pagelist_encode_8(pagelist, 1);
3661 ceph_pagelist_encode_32(pagelist, struct_len);
3662 }
3663 ceph_pagelist_encode_string(pagelist, NULL, 0);
3664 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3665 ceph_locks_to_pagelist(flocks, pagelist,
3666 num_fcntl_locks, num_flock_locks);
3667 if (struct_v >= 2)
3668 ceph_pagelist_encode_64(pagelist, snap_follows);
3669out_freeflocks:
3670 kfree(flocks);
3671 } else {
3672 u64 pathbase = 0;
3673 int pathlen = 0;
3674 char *path = NULL;
3675 struct dentry *dentry;
3676
3677 dentry = d_find_alias(inode);
3678 if (dentry) {
3679 path = ceph_mdsc_build_path(dentry,
3680 &pathlen, &pathbase, 0);
3681 dput(dentry);
3682 if (IS_ERR(path)) {
3683 err = PTR_ERR(path);
3684 goto out_err;
3685 }
3686 rec.v1.pathbase = cpu_to_le64(pathbase);
3687 }
3688
3689 err = ceph_pagelist_reserve(pagelist,
3690 sizeof(u64) + sizeof(u32) +
3691 pathlen + sizeof(rec.v1));
3692 if (err) {
3693 goto out_freepath;
3694 }
3695
3696 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3697 ceph_pagelist_encode_string(pagelist, path, pathlen);
3698 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3699out_freepath:
3700 ceph_mdsc_free_path(path, pathlen);
3701 }
3702
3703out_err:
3704 if (err >= 0)
3705 recon_state->nr_caps++;
3706 return err;
3707}
3708
3709static int encode_snap_realms(struct ceph_mds_client *mdsc,
3710 struct ceph_reconnect_state *recon_state)
3711{
3712 struct rb_node *p;
3713 struct ceph_pagelist *pagelist = recon_state->pagelist;
3714 int err = 0;
3715
3716 if (recon_state->msg_version >= 4) {
3717 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3718 if (err < 0)
3719 goto fail;
3720 }
3721
3722
3723
3724
3725
3726
3727 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3728 struct ceph_snap_realm *realm =
3729 rb_entry(p, struct ceph_snap_realm, node);
3730 struct ceph_mds_snaprealm_reconnect sr_rec;
3731
3732 if (recon_state->msg_version >= 4) {
3733 size_t need = sizeof(u8) * 2 + sizeof(u32) +
3734 sizeof(sr_rec);
3735
3736 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3737 err = send_reconnect_partial(recon_state);
3738 if (err)
3739 goto fail;
3740 pagelist = recon_state->pagelist;
3741 }
3742
3743 err = ceph_pagelist_reserve(pagelist, need);
3744 if (err)
3745 goto fail;
3746
3747 ceph_pagelist_encode_8(pagelist, 1);
3748 ceph_pagelist_encode_8(pagelist, 1);
3749 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3750 }
3751
3752 dout(" adding snap realm %llx seq %lld parent %llx\n",
3753 realm->ino, realm->seq, realm->parent_ino);
3754 sr_rec.ino = cpu_to_le64(realm->ino);
3755 sr_rec.seq = cpu_to_le64(realm->seq);
3756 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3757
3758 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3759 if (err)
3760 goto fail;
3761
3762 recon_state->nr_realms++;
3763 }
3764fail:
3765 return err;
3766}
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3780 struct ceph_mds_session *session)
3781{
3782 struct ceph_msg *reply;
3783 int mds = session->s_mds;
3784 int err = -ENOMEM;
3785 struct ceph_reconnect_state recon_state = {
3786 .session = session,
3787 };
3788 LIST_HEAD(dispose);
3789
3790 pr_info("mds%d reconnect start\n", mds);
3791
3792 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3793 if (!recon_state.pagelist)
3794 goto fail_nopagelist;
3795
3796 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3797 if (!reply)
3798 goto fail_nomsg;
3799
3800 xa_destroy(&session->s_delegated_inos);
3801
3802 mutex_lock(&session->s_mutex);
3803 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3804 session->s_seq = 0;
3805
3806 dout("session %p state %s\n", session,
3807 ceph_session_state_name(session->s_state));
3808
3809 spin_lock(&session->s_gen_ttl_lock);
3810 session->s_cap_gen++;
3811 spin_unlock(&session->s_gen_ttl_lock);
3812
3813 spin_lock(&session->s_cap_lock);
3814
3815 session->s_readonly = 0;
3816
3817
3818
3819
3820
3821 session->s_cap_reconnect = 1;
3822
3823 detach_cap_releases(session, &dispose);
3824 spin_unlock(&session->s_cap_lock);
3825 dispose_cap_releases(mdsc, &dispose);
3826
3827
3828 if (mdsc->fsc->sb->s_root)
3829 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3830
3831 ceph_con_close(&session->s_con);
3832 ceph_con_open(&session->s_con,
3833 CEPH_ENTITY_TYPE_MDS, mds,
3834 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3835
3836
3837 replay_unsafe_requests(mdsc, session);
3838
3839 ceph_early_kick_flushing_caps(mdsc, session);
3840
3841 down_read(&mdsc->snap_rwsem);
3842
3843
3844 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3845 if (err)
3846 goto fail;
3847
3848 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3849 recon_state.msg_version = 3;
3850 recon_state.allow_multi = true;
3851 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3852 recon_state.msg_version = 3;
3853 } else {
3854 recon_state.msg_version = 2;
3855 }
3856
3857 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
3858
3859 spin_lock(&session->s_cap_lock);
3860 session->s_cap_reconnect = 0;
3861 spin_unlock(&session->s_cap_lock);
3862
3863 if (err < 0)
3864 goto fail;
3865
3866
3867 if (mdsc->num_snap_realms) {
3868 size_t total_len =
3869 recon_state.pagelist->length +
3870 mdsc->num_snap_realms *
3871 sizeof(struct ceph_mds_snaprealm_reconnect);
3872 if (recon_state.msg_version >= 4) {
3873
3874 total_len += sizeof(u32);
3875
3876 total_len += mdsc->num_snap_realms *
3877 (2 * sizeof(u8) + sizeof(u32));
3878 }
3879 if (total_len > RECONNECT_MAX_SIZE) {
3880 if (!recon_state.allow_multi) {
3881 err = -ENOSPC;
3882 goto fail;
3883 }
3884 if (recon_state.nr_caps) {
3885 err = send_reconnect_partial(&recon_state);
3886 if (err)
3887 goto fail;
3888 }
3889 recon_state.msg_version = 5;
3890 }
3891 }
3892
3893 err = encode_snap_realms(mdsc, &recon_state);
3894 if (err < 0)
3895 goto fail;
3896
3897 if (recon_state.msg_version >= 5) {
3898 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3899 if (err < 0)
3900 goto fail;
3901 }
3902
3903 if (recon_state.nr_caps || recon_state.nr_realms) {
3904 struct page *page =
3905 list_first_entry(&recon_state.pagelist->head,
3906 struct page, lru);
3907 __le32 *addr = kmap_atomic(page);
3908 if (recon_state.nr_caps) {
3909 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3910 *addr = cpu_to_le32(recon_state.nr_caps);
3911 } else if (recon_state.msg_version >= 4) {
3912 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3913 }
3914 kunmap_atomic(addr);
3915 }
3916
3917 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3918 if (recon_state.msg_version >= 4)
3919 reply->hdr.compat_version = cpu_to_le16(4);
3920
3921 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3922 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3923
3924 ceph_con_send(&session->s_con, reply);
3925
3926 mutex_unlock(&session->s_mutex);
3927
3928 mutex_lock(&mdsc->mutex);
3929 __wake_requests(mdsc, &session->s_waiting);
3930 mutex_unlock(&mdsc->mutex);
3931
3932 up_read(&mdsc->snap_rwsem);
3933 ceph_pagelist_release(recon_state.pagelist);
3934 return;
3935
3936fail:
3937 ceph_msg_put(reply);
3938 up_read(&mdsc->snap_rwsem);
3939 mutex_unlock(&session->s_mutex);
3940fail_nomsg:
3941 ceph_pagelist_release(recon_state.pagelist);
3942fail_nopagelist:
3943 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3944 return;
3945}
3946
3947
3948
3949
3950
3951
3952
3953
3954static void check_new_map(struct ceph_mds_client *mdsc,
3955 struct ceph_mdsmap *newmap,
3956 struct ceph_mdsmap *oldmap)
3957{
3958 int i;
3959 int oldstate, newstate;
3960 struct ceph_mds_session *s;
3961
3962 dout("check_new_map new %u old %u\n",
3963 newmap->m_epoch, oldmap->m_epoch);
3964
3965 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
3966 if (!mdsc->sessions[i])
3967 continue;
3968 s = mdsc->sessions[i];
3969 oldstate = ceph_mdsmap_get_state(oldmap, i);
3970 newstate = ceph_mdsmap_get_state(newmap, i);
3971
3972 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3973 i, ceph_mds_state_name(oldstate),
3974 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3975 ceph_mds_state_name(newstate),
3976 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3977 ceph_session_state_name(s->s_state));
3978
3979 if (i >= newmap->possible_max_rank) {
3980
3981 ceph_get_mds_session(s);
3982 __unregister_session(mdsc, s);
3983 __wake_requests(mdsc, &s->s_waiting);
3984 mutex_unlock(&mdsc->mutex);
3985
3986 mutex_lock(&s->s_mutex);
3987 cleanup_session_requests(mdsc, s);
3988 remove_session_caps(s);
3989 mutex_unlock(&s->s_mutex);
3990
3991 ceph_put_mds_session(s);
3992
3993 mutex_lock(&mdsc->mutex);
3994 kick_requests(mdsc, i);
3995 continue;
3996 }
3997
3998 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3999 ceph_mdsmap_get_addr(newmap, i),
4000 sizeof(struct ceph_entity_addr))) {
4001
4002 mutex_unlock(&mdsc->mutex);
4003 mutex_lock(&s->s_mutex);
4004 mutex_lock(&mdsc->mutex);
4005 ceph_con_close(&s->s_con);
4006 mutex_unlock(&s->s_mutex);
4007 s->s_state = CEPH_MDS_SESSION_RESTARTING;
4008 } else if (oldstate == newstate) {
4009 continue;
4010 }
4011
4012
4013
4014
4015 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4016 newstate >= CEPH_MDS_STATE_RECONNECT) {
4017 mutex_unlock(&mdsc->mutex);
4018 send_mds_reconnect(mdsc, s);
4019 mutex_lock(&mdsc->mutex);
4020 }
4021
4022
4023
4024
4025 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4026 newstate >= CEPH_MDS_STATE_ACTIVE) {
4027 if (oldstate != CEPH_MDS_STATE_CREATING &&
4028 oldstate != CEPH_MDS_STATE_STARTING)
4029 pr_info("mds%d recovery completed\n", s->s_mds);
4030 kick_requests(mdsc, i);
4031 mutex_unlock(&mdsc->mutex);
4032 mutex_lock(&s->s_mutex);
4033 mutex_lock(&mdsc->mutex);
4034 ceph_kick_flushing_caps(mdsc, s);
4035 mutex_unlock(&s->s_mutex);
4036 wake_up_session_caps(s, RECONNECT);
4037 }
4038 }
4039
4040 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4041 s = mdsc->sessions[i];
4042 if (!s)
4043 continue;
4044 if (!ceph_mdsmap_is_laggy(newmap, i))
4045 continue;
4046 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4047 s->s_state == CEPH_MDS_SESSION_HUNG ||
4048 s->s_state == CEPH_MDS_SESSION_CLOSING) {
4049 dout(" connecting to export targets of laggy mds%d\n",
4050 i);
4051 __open_export_target_sessions(mdsc, s);
4052 }
4053 }
4054}
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4066{
4067 struct ceph_dentry_info *di = ceph_dentry(dentry);
4068
4069 ceph_put_mds_session(di->lease_session);
4070 di->lease_session = NULL;
4071}
4072
4073static void handle_lease(struct ceph_mds_client *mdsc,
4074 struct ceph_mds_session *session,
4075 struct ceph_msg *msg)
4076{
4077 struct super_block *sb = mdsc->fsc->sb;
4078 struct inode *inode;
4079 struct dentry *parent, *dentry;
4080 struct ceph_dentry_info *di;
4081 int mds = session->s_mds;
4082 struct ceph_mds_lease *h = msg->front.iov_base;
4083 u32 seq;
4084 struct ceph_vino vino;
4085 struct qstr dname;
4086 int release = 0;
4087
4088 dout("handle_lease from mds%d\n", mds);
4089
4090
4091 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4092 goto bad;
4093 vino.ino = le64_to_cpu(h->ino);
4094 vino.snap = CEPH_NOSNAP;
4095 seq = le32_to_cpu(h->seq);
4096 dname.len = get_unaligned_le32(h + 1);
4097 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4098 goto bad;
4099 dname.name = (void *)(h + 1) + sizeof(u32);
4100
4101
4102 inode = ceph_find_inode(sb, vino);
4103 dout("handle_lease %s, ino %llx %p %.*s\n",
4104 ceph_lease_op_name(h->action), vino.ino, inode,
4105 dname.len, dname.name);
4106
4107 mutex_lock(&session->s_mutex);
4108 session->s_seq++;
4109
4110 if (!inode) {
4111 dout("handle_lease no inode %llx\n", vino.ino);
4112 goto release;
4113 }
4114
4115
4116 parent = d_find_alias(inode);
4117 if (!parent) {
4118 dout("no parent dentry on inode %p\n", inode);
4119 WARN_ON(1);
4120 goto release;
4121 }
4122 dname.hash = full_name_hash(parent, dname.name, dname.len);
4123 dentry = d_lookup(parent, &dname);
4124 dput(parent);
4125 if (!dentry)
4126 goto release;
4127
4128 spin_lock(&dentry->d_lock);
4129 di = ceph_dentry(dentry);
4130 switch (h->action) {
4131 case CEPH_MDS_LEASE_REVOKE:
4132 if (di->lease_session == session) {
4133 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4134 h->seq = cpu_to_le32(di->lease_seq);
4135 __ceph_mdsc_drop_dentry_lease(dentry);
4136 }
4137 release = 1;
4138 break;
4139
4140 case CEPH_MDS_LEASE_RENEW:
4141 if (di->lease_session == session &&
4142 di->lease_gen == session->s_cap_gen &&
4143 di->lease_renew_from &&
4144 di->lease_renew_after == 0) {
4145 unsigned long duration =
4146 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4147
4148 di->lease_seq = seq;
4149 di->time = di->lease_renew_from + duration;
4150 di->lease_renew_after = di->lease_renew_from +
4151 (duration >> 1);
4152 di->lease_renew_from = 0;
4153 }
4154 break;
4155 }
4156 spin_unlock(&dentry->d_lock);
4157 dput(dentry);
4158
4159 if (!release)
4160 goto out;
4161
4162release:
4163
4164 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4165 ceph_msg_get(msg);
4166 ceph_con_send(&session->s_con, msg);
4167
4168out:
4169 mutex_unlock(&session->s_mutex);
4170
4171 ceph_async_iput(inode);
4172 return;
4173
4174bad:
4175 pr_err("corrupt lease message\n");
4176 ceph_msg_dump(msg);
4177}
4178
4179void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4180 struct dentry *dentry, char action,
4181 u32 seq)
4182{
4183 struct ceph_msg *msg;
4184 struct ceph_mds_lease *lease;
4185 struct inode *dir;
4186 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4187
4188 dout("lease_send_msg identry %p %s to mds%d\n",
4189 dentry, ceph_lease_op_name(action), session->s_mds);
4190
4191 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4192 if (!msg)
4193 return;
4194 lease = msg->front.iov_base;
4195 lease->action = action;
4196 lease->seq = cpu_to_le32(seq);
4197
4198 spin_lock(&dentry->d_lock);
4199 dir = d_inode(dentry->d_parent);
4200 lease->ino = cpu_to_le64(ceph_ino(dir));
4201 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4202
4203 put_unaligned_le32(dentry->d_name.len, lease + 1);
4204 memcpy((void *)(lease + 1) + 4,
4205 dentry->d_name.name, dentry->d_name.len);
4206 spin_unlock(&dentry->d_lock);
4207
4208
4209
4210
4211
4212 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4213
4214 ceph_con_send(&session->s_con, msg);
4215}
4216
4217
4218
4219
4220static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4221{
4222 int i;
4223
4224 mutex_lock(&mdsc->mutex);
4225 for (i = 0; i < mdsc->max_sessions; i++) {
4226 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4227 if (!s)
4228 continue;
4229 mutex_unlock(&mdsc->mutex);
4230 mutex_lock(&s->s_mutex);
4231 mutex_unlock(&s->s_mutex);
4232 ceph_put_mds_session(s);
4233 mutex_lock(&mdsc->mutex);
4234 }
4235 mutex_unlock(&mdsc->mutex);
4236}
4237
4238static void maybe_recover_session(struct ceph_mds_client *mdsc)
4239{
4240 struct ceph_fs_client *fsc = mdsc->fsc;
4241
4242 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4243 return;
4244
4245 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4246 return;
4247
4248 if (!READ_ONCE(fsc->blacklisted))
4249 return;
4250
4251 if (fsc->last_auto_reconnect &&
4252 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4253 return;
4254
4255 pr_info("auto reconnect after blacklisted\n");
4256 fsc->last_auto_reconnect = jiffies;
4257 ceph_force_reconnect(fsc->sb);
4258}
4259
4260
4261
4262
4263static void schedule_delayed(struct ceph_mds_client *mdsc)
4264{
4265 int delay = 5;
4266 unsigned hz = round_jiffies_relative(HZ * delay);
4267 schedule_delayed_work(&mdsc->delayed_work, hz);
4268}
4269
4270static void delayed_work(struct work_struct *work)
4271{
4272 int i;
4273 struct ceph_mds_client *mdsc =
4274 container_of(work, struct ceph_mds_client, delayed_work.work);
4275 int renew_interval;
4276 int renew_caps;
4277
4278 dout("mdsc delayed_work\n");
4279
4280 mutex_lock(&mdsc->mutex);
4281 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4282 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4283 mdsc->last_renew_caps);
4284 if (renew_caps)
4285 mdsc->last_renew_caps = jiffies;
4286
4287 for (i = 0; i < mdsc->max_sessions; i++) {
4288 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4289 if (!s)
4290 continue;
4291 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4292 dout("resending session close request for mds%d\n",
4293 s->s_mds);
4294 request_close_session(mdsc, s);
4295 ceph_put_mds_session(s);
4296 continue;
4297 }
4298 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4299 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4300 s->s_state = CEPH_MDS_SESSION_HUNG;
4301 pr_info("mds%d hung\n", s->s_mds);
4302 }
4303 }
4304 if (s->s_state == CEPH_MDS_SESSION_NEW ||
4305 s->s_state == CEPH_MDS_SESSION_RESTARTING ||
4306 s->s_state == CEPH_MDS_SESSION_REJECTED) {
4307
4308 ceph_put_mds_session(s);
4309 continue;
4310 }
4311 mutex_unlock(&mdsc->mutex);
4312
4313 mutex_lock(&s->s_mutex);
4314 if (renew_caps)
4315 send_renew_caps(mdsc, s);
4316 else
4317 ceph_con_keepalive(&s->s_con);
4318 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4319 s->s_state == CEPH_MDS_SESSION_HUNG)
4320 ceph_send_cap_releases(mdsc, s);
4321 mutex_unlock(&s->s_mutex);
4322 ceph_put_mds_session(s);
4323
4324 mutex_lock(&mdsc->mutex);
4325 }
4326 mutex_unlock(&mdsc->mutex);
4327
4328 ceph_check_delayed_caps(mdsc);
4329
4330 ceph_queue_cap_reclaim_work(mdsc);
4331
4332 ceph_trim_snapid_map(mdsc);
4333
4334 maybe_recover_session(mdsc);
4335
4336 schedule_delayed(mdsc);
4337}
4338
4339int ceph_mdsc_init(struct ceph_fs_client *fsc)
4340
4341{
4342 struct ceph_mds_client *mdsc;
4343
4344 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4345 if (!mdsc)
4346 return -ENOMEM;
4347 mdsc->fsc = fsc;
4348 mutex_init(&mdsc->mutex);
4349 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4350 if (!mdsc->mdsmap) {
4351 kfree(mdsc);
4352 return -ENOMEM;
4353 }
4354
4355 fsc->mdsc = mdsc;
4356 init_completion(&mdsc->safe_umount_waiters);
4357 init_waitqueue_head(&mdsc->session_close_wq);
4358 INIT_LIST_HEAD(&mdsc->waiting_for_map);
4359 mdsc->sessions = NULL;
4360 atomic_set(&mdsc->num_sessions, 0);
4361 mdsc->max_sessions = 0;
4362 mdsc->stopping = 0;
4363 atomic64_set(&mdsc->quotarealms_count, 0);
4364 mdsc->quotarealms_inodes = RB_ROOT;
4365 mutex_init(&mdsc->quotarealms_inodes_mutex);
4366 mdsc->last_snap_seq = 0;
4367 init_rwsem(&mdsc->snap_rwsem);
4368 mdsc->snap_realms = RB_ROOT;
4369 INIT_LIST_HEAD(&mdsc->snap_empty);
4370 mdsc->num_snap_realms = 0;
4371 spin_lock_init(&mdsc->snap_empty_lock);
4372 mdsc->last_tid = 0;
4373 mdsc->oldest_tid = 0;
4374 mdsc->request_tree = RB_ROOT;
4375 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4376 mdsc->last_renew_caps = jiffies;
4377 INIT_LIST_HEAD(&mdsc->cap_delay_list);
4378 INIT_LIST_HEAD(&mdsc->cap_wait_list);
4379 spin_lock_init(&mdsc->cap_delay_lock);
4380 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4381 spin_lock_init(&mdsc->snap_flush_lock);
4382 mdsc->last_cap_flush_tid = 1;
4383 INIT_LIST_HEAD(&mdsc->cap_flush_list);
4384 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4385 mdsc->num_cap_flushing = 0;
4386 spin_lock_init(&mdsc->cap_dirty_lock);
4387 init_waitqueue_head(&mdsc->cap_flushing_wq);
4388 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4389 atomic_set(&mdsc->cap_reclaim_pending, 0);
4390
4391 spin_lock_init(&mdsc->dentry_list_lock);
4392 INIT_LIST_HEAD(&mdsc->dentry_leases);
4393 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4394
4395 ceph_caps_init(mdsc);
4396 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4397
4398 spin_lock_init(&mdsc->snapid_map_lock);
4399 mdsc->snapid_map_tree = RB_ROOT;
4400 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4401
4402 init_rwsem(&mdsc->pool_perm_rwsem);
4403 mdsc->pool_perm_tree = RB_ROOT;
4404
4405 strscpy(mdsc->nodename, utsname()->nodename,
4406 sizeof(mdsc->nodename));
4407 return 0;
4408}
4409
4410
4411
4412
4413
4414static void wait_requests(struct ceph_mds_client *mdsc)
4415{
4416 struct ceph_options *opts = mdsc->fsc->client->options;
4417 struct ceph_mds_request *req;
4418
4419 mutex_lock(&mdsc->mutex);
4420 if (__get_oldest_req(mdsc)) {
4421 mutex_unlock(&mdsc->mutex);
4422
4423 dout("wait_requests waiting for requests\n");
4424 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4425 ceph_timeout_jiffies(opts->mount_timeout));
4426
4427
4428 mutex_lock(&mdsc->mutex);
4429 while ((req = __get_oldest_req(mdsc))) {
4430 dout("wait_requests timed out on tid %llu\n",
4431 req->r_tid);
4432 list_del_init(&req->r_wait);
4433 __unregister_request(mdsc, req);
4434 }
4435 }
4436 mutex_unlock(&mdsc->mutex);
4437 dout("wait_requests done\n");
4438}
4439
4440
4441
4442
4443
4444void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4445{
4446 dout("pre_umount\n");
4447 mdsc->stopping = 1;
4448
4449 lock_unlock_sessions(mdsc);
4450 ceph_flush_dirty_caps(mdsc);
4451 wait_requests(mdsc);
4452
4453
4454
4455
4456
4457 ceph_msgr_flush();
4458
4459 ceph_cleanup_quotarealms_inodes(mdsc);
4460}
4461
4462
4463
4464
4465static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4466{
4467 struct ceph_mds_request *req = NULL, *nextreq;
4468 struct rb_node *n;
4469
4470 mutex_lock(&mdsc->mutex);
4471 dout("wait_unsafe_requests want %lld\n", want_tid);
4472restart:
4473 req = __get_oldest_req(mdsc);
4474 while (req && req->r_tid <= want_tid) {
4475
4476 n = rb_next(&req->r_node);
4477 if (n)
4478 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4479 else
4480 nextreq = NULL;
4481 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4482 (req->r_op & CEPH_MDS_OP_WRITE)) {
4483
4484 ceph_mdsc_get_request(req);
4485 if (nextreq)
4486 ceph_mdsc_get_request(nextreq);
4487 mutex_unlock(&mdsc->mutex);
4488 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4489 req->r_tid, want_tid);
4490 wait_for_completion(&req->r_safe_completion);
4491 mutex_lock(&mdsc->mutex);
4492 ceph_mdsc_put_request(req);
4493 if (!nextreq)
4494 break;
4495 if (RB_EMPTY_NODE(&nextreq->r_node)) {
4496
4497 ceph_mdsc_put_request(nextreq);
4498 goto restart;
4499 }
4500 ceph_mdsc_put_request(nextreq);
4501 }
4502 req = nextreq;
4503 }
4504 mutex_unlock(&mdsc->mutex);
4505 dout("wait_unsafe_requests done\n");
4506}
4507
4508void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4509{
4510 u64 want_tid, want_flush;
4511
4512 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4513 return;
4514
4515 dout("sync\n");
4516 mutex_lock(&mdsc->mutex);
4517 want_tid = mdsc->last_tid;
4518 mutex_unlock(&mdsc->mutex);
4519
4520 ceph_flush_dirty_caps(mdsc);
4521 spin_lock(&mdsc->cap_dirty_lock);
4522 want_flush = mdsc->last_cap_flush_tid;
4523 if (!list_empty(&mdsc->cap_flush_list)) {
4524 struct ceph_cap_flush *cf =
4525 list_last_entry(&mdsc->cap_flush_list,
4526 struct ceph_cap_flush, g_list);
4527 cf->wake = true;
4528 }
4529 spin_unlock(&mdsc->cap_dirty_lock);
4530
4531 dout("sync want tid %lld flush_seq %lld\n",
4532 want_tid, want_flush);
4533
4534 wait_unsafe_requests(mdsc, want_tid);
4535 wait_caps_flush(mdsc, want_flush);
4536}
4537
4538
4539
4540
4541static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4542{
4543 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4544 return true;
4545 return atomic_read(&mdsc->num_sessions) <= skipped;
4546}
4547
4548
4549
4550
4551void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4552{
4553 struct ceph_options *opts = mdsc->fsc->client->options;
4554 struct ceph_mds_session *session;
4555 int i;
4556 int skipped = 0;
4557
4558 dout("close_sessions\n");
4559
4560
4561 mutex_lock(&mdsc->mutex);
4562 for (i = 0; i < mdsc->max_sessions; i++) {
4563 session = __ceph_lookup_mds_session(mdsc, i);
4564 if (!session)
4565 continue;
4566 mutex_unlock(&mdsc->mutex);
4567 mutex_lock(&session->s_mutex);
4568 if (__close_session(mdsc, session) <= 0)
4569 skipped++;
4570 mutex_unlock(&session->s_mutex);
4571 ceph_put_mds_session(session);
4572 mutex_lock(&mdsc->mutex);
4573 }
4574 mutex_unlock(&mdsc->mutex);
4575
4576 dout("waiting for sessions to close\n");
4577 wait_event_timeout(mdsc->session_close_wq,
4578 done_closing_sessions(mdsc, skipped),
4579 ceph_timeout_jiffies(opts->mount_timeout));
4580
4581
4582 mutex_lock(&mdsc->mutex);
4583 for (i = 0; i < mdsc->max_sessions; i++) {
4584 if (mdsc->sessions[i]) {
4585 session = ceph_get_mds_session(mdsc->sessions[i]);
4586 __unregister_session(mdsc, session);
4587 mutex_unlock(&mdsc->mutex);
4588 mutex_lock(&session->s_mutex);
4589 remove_session_caps(session);
4590 mutex_unlock(&session->s_mutex);
4591 ceph_put_mds_session(session);
4592 mutex_lock(&mdsc->mutex);
4593 }
4594 }
4595 WARN_ON(!list_empty(&mdsc->cap_delay_list));
4596 mutex_unlock(&mdsc->mutex);
4597
4598 ceph_cleanup_snapid_map(mdsc);
4599 ceph_cleanup_empty_realms(mdsc);
4600
4601 cancel_work_sync(&mdsc->cap_reclaim_work);
4602 cancel_delayed_work_sync(&mdsc->delayed_work);
4603
4604 dout("stopped\n");
4605}
4606
4607void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4608{
4609 struct ceph_mds_session *session;
4610 int mds;
4611
4612 dout("force umount\n");
4613
4614 mutex_lock(&mdsc->mutex);
4615 for (mds = 0; mds < mdsc->max_sessions; mds++) {
4616 session = __ceph_lookup_mds_session(mdsc, mds);
4617 if (!session)
4618 continue;
4619
4620 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4621 __unregister_session(mdsc, session);
4622 __wake_requests(mdsc, &session->s_waiting);
4623 mutex_unlock(&mdsc->mutex);
4624
4625 mutex_lock(&session->s_mutex);
4626 __close_session(mdsc, session);
4627 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4628 cleanup_session_requests(mdsc, session);
4629 remove_session_caps(session);
4630 }
4631 mutex_unlock(&session->s_mutex);
4632 ceph_put_mds_session(session);
4633
4634 mutex_lock(&mdsc->mutex);
4635 kick_requests(mdsc, mds);
4636 }
4637 __wake_requests(mdsc, &mdsc->waiting_for_map);
4638 mutex_unlock(&mdsc->mutex);
4639}
4640
4641static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4642{
4643 dout("stop\n");
4644 cancel_delayed_work_sync(&mdsc->delayed_work);
4645 if (mdsc->mdsmap)
4646 ceph_mdsmap_destroy(mdsc->mdsmap);
4647 kfree(mdsc->sessions);
4648 ceph_caps_finalize(mdsc);
4649 ceph_pool_perm_destroy(mdsc);
4650}
4651
4652void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4653{
4654 struct ceph_mds_client *mdsc = fsc->mdsc;
4655 dout("mdsc_destroy %p\n", mdsc);
4656
4657 if (!mdsc)
4658 return;
4659
4660
4661 ceph_msgr_flush();
4662
4663 ceph_mdsc_stop(mdsc);
4664
4665 fsc->mdsc = NULL;
4666 kfree(mdsc);
4667 dout("mdsc_destroy %p done\n", mdsc);
4668}
4669
4670void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4671{
4672 struct ceph_fs_client *fsc = mdsc->fsc;
4673 const char *mds_namespace = fsc->mount_options->mds_namespace;
4674 void *p = msg->front.iov_base;
4675 void *end = p + msg->front.iov_len;
4676 u32 epoch;
4677 u32 map_len;
4678 u32 num_fs;
4679 u32 mount_fscid = (u32)-1;
4680 u8 struct_v, struct_cv;
4681 int err = -EINVAL;
4682
4683 ceph_decode_need(&p, end, sizeof(u32), bad);
4684 epoch = ceph_decode_32(&p);
4685
4686 dout("handle_fsmap epoch %u\n", epoch);
4687
4688 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4689 struct_v = ceph_decode_8(&p);
4690 struct_cv = ceph_decode_8(&p);
4691 map_len = ceph_decode_32(&p);
4692
4693 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4694 p += sizeof(u32) * 2;
4695
4696 num_fs = ceph_decode_32(&p);
4697 while (num_fs-- > 0) {
4698 void *info_p, *info_end;
4699 u32 info_len;
4700 u8 info_v, info_cv;
4701 u32 fscid, namelen;
4702
4703 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4704 info_v = ceph_decode_8(&p);
4705 info_cv = ceph_decode_8(&p);
4706 info_len = ceph_decode_32(&p);
4707 ceph_decode_need(&p, end, info_len, bad);
4708 info_p = p;
4709 info_end = p + info_len;
4710 p = info_end;
4711
4712 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4713 fscid = ceph_decode_32(&info_p);
4714 namelen = ceph_decode_32(&info_p);
4715 ceph_decode_need(&info_p, info_end, namelen, bad);
4716
4717 if (mds_namespace &&
4718 strlen(mds_namespace) == namelen &&
4719 !strncmp(mds_namespace, (char *)info_p, namelen)) {
4720 mount_fscid = fscid;
4721 break;
4722 }
4723 }
4724
4725 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4726 if (mount_fscid != (u32)-1) {
4727 fsc->client->monc.fs_cluster_id = mount_fscid;
4728 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4729 0, true);
4730 ceph_monc_renew_subs(&fsc->client->monc);
4731 } else {
4732 err = -ENOENT;
4733 goto err_out;
4734 }
4735 return;
4736
4737bad:
4738 pr_err("error decoding fsmap\n");
4739err_out:
4740 mutex_lock(&mdsc->mutex);
4741 mdsc->mdsmap_err = err;
4742 __wake_requests(mdsc, &mdsc->waiting_for_map);
4743 mutex_unlock(&mdsc->mutex);
4744}
4745
4746
4747
4748
4749void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4750{
4751 u32 epoch;
4752 u32 maplen;
4753 void *p = msg->front.iov_base;
4754 void *end = p + msg->front.iov_len;
4755 struct ceph_mdsmap *newmap, *oldmap;
4756 struct ceph_fsid fsid;
4757 int err = -EINVAL;
4758
4759 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4760 ceph_decode_copy(&p, &fsid, sizeof(fsid));
4761 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4762 return;
4763 epoch = ceph_decode_32(&p);
4764 maplen = ceph_decode_32(&p);
4765 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4766
4767
4768 mutex_lock(&mdsc->mutex);
4769 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4770 dout("handle_map epoch %u <= our %u\n",
4771 epoch, mdsc->mdsmap->m_epoch);
4772 mutex_unlock(&mdsc->mutex);
4773 return;
4774 }
4775
4776 newmap = ceph_mdsmap_decode(&p, end);
4777 if (IS_ERR(newmap)) {
4778 err = PTR_ERR(newmap);
4779 goto bad_unlock;
4780 }
4781
4782
4783 if (mdsc->mdsmap) {
4784 oldmap = mdsc->mdsmap;
4785 mdsc->mdsmap = newmap;
4786 check_new_map(mdsc, newmap, oldmap);
4787 ceph_mdsmap_destroy(oldmap);
4788 } else {
4789 mdsc->mdsmap = newmap;
4790 }
4791 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4792 MAX_LFS_FILESIZE);
4793
4794 __wake_requests(mdsc, &mdsc->waiting_for_map);
4795 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4796 mdsc->mdsmap->m_epoch);
4797
4798 mutex_unlock(&mdsc->mutex);
4799 schedule_delayed(mdsc);
4800 return;
4801
4802bad_unlock:
4803 mutex_unlock(&mdsc->mutex);
4804bad:
4805 pr_err("error decoding mdsmap %d\n", err);
4806 return;
4807}
4808
4809static struct ceph_connection *con_get(struct ceph_connection *con)
4810{
4811 struct ceph_mds_session *s = con->private;
4812
4813 if (ceph_get_mds_session(s))
4814 return con;
4815 return NULL;
4816}
4817
4818static void con_put(struct ceph_connection *con)
4819{
4820 struct ceph_mds_session *s = con->private;
4821
4822 ceph_put_mds_session(s);
4823}
4824
4825
4826
4827
4828
4829static void peer_reset(struct ceph_connection *con)
4830{
4831 struct ceph_mds_session *s = con->private;
4832 struct ceph_mds_client *mdsc = s->s_mdsc;
4833
4834 pr_warn("mds%d closed our session\n", s->s_mds);
4835 send_mds_reconnect(mdsc, s);
4836}
4837
4838static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4839{
4840 struct ceph_mds_session *s = con->private;
4841 struct ceph_mds_client *mdsc = s->s_mdsc;
4842 int type = le16_to_cpu(msg->hdr.type);
4843
4844 mutex_lock(&mdsc->mutex);
4845 if (__verify_registered_session(mdsc, s) < 0) {
4846 mutex_unlock(&mdsc->mutex);
4847 goto out;
4848 }
4849 mutex_unlock(&mdsc->mutex);
4850
4851 switch (type) {
4852 case CEPH_MSG_MDS_MAP:
4853 ceph_mdsc_handle_mdsmap(mdsc, msg);
4854 break;
4855 case CEPH_MSG_FS_MAP_USER:
4856 ceph_mdsc_handle_fsmap(mdsc, msg);
4857 break;
4858 case CEPH_MSG_CLIENT_SESSION:
4859 handle_session(s, msg);
4860 break;
4861 case CEPH_MSG_CLIENT_REPLY:
4862 handle_reply(s, msg);
4863 break;
4864 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4865 handle_forward(mdsc, s, msg);
4866 break;
4867 case CEPH_MSG_CLIENT_CAPS:
4868 ceph_handle_caps(s, msg);
4869 break;
4870 case CEPH_MSG_CLIENT_SNAP:
4871 ceph_handle_snap(mdsc, s, msg);
4872 break;
4873 case CEPH_MSG_CLIENT_LEASE:
4874 handle_lease(mdsc, s, msg);
4875 break;
4876 case CEPH_MSG_CLIENT_QUOTA:
4877 ceph_handle_quota(mdsc, s, msg);
4878 break;
4879
4880 default:
4881 pr_err("received unknown message type %d %s\n", type,
4882 ceph_msg_type_name(type));
4883 }
4884out:
4885 ceph_msg_put(msg);
4886}
4887
4888
4889
4890
4891
4892
4893
4894
4895
4896static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4897 int *proto, int force_new)
4898{
4899 struct ceph_mds_session *s = con->private;
4900 struct ceph_mds_client *mdsc = s->s_mdsc;
4901 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4902 struct ceph_auth_handshake *auth = &s->s_auth;
4903
4904 if (force_new && auth->authorizer) {
4905 ceph_auth_destroy_authorizer(auth->authorizer);
4906 auth->authorizer = NULL;
4907 }
4908 if (!auth->authorizer) {
4909 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4910 auth);
4911 if (ret)
4912 return ERR_PTR(ret);
4913 } else {
4914 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4915 auth);
4916 if (ret)
4917 return ERR_PTR(ret);
4918 }
4919 *proto = ac->protocol;
4920
4921 return auth;
4922}
4923
4924static int add_authorizer_challenge(struct ceph_connection *con,
4925 void *challenge_buf, int challenge_buf_len)
4926{
4927 struct ceph_mds_session *s = con->private;
4928 struct ceph_mds_client *mdsc = s->s_mdsc;
4929 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4930
4931 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4932 challenge_buf, challenge_buf_len);
4933}
4934
4935static int verify_authorizer_reply(struct ceph_connection *con)
4936{
4937 struct ceph_mds_session *s = con->private;
4938 struct ceph_mds_client *mdsc = s->s_mdsc;
4939 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4940
4941 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4942}
4943
4944static int invalidate_authorizer(struct ceph_connection *con)
4945{
4946 struct ceph_mds_session *s = con->private;
4947 struct ceph_mds_client *mdsc = s->s_mdsc;
4948 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4949
4950 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4951
4952 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4953}
4954
4955static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4956 struct ceph_msg_header *hdr, int *skip)
4957{
4958 struct ceph_msg *msg;
4959 int type = (int) le16_to_cpu(hdr->type);
4960 int front_len = (int) le32_to_cpu(hdr->front_len);
4961
4962 if (con->in_msg)
4963 return con->in_msg;
4964
4965 *skip = 0;
4966 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4967 if (!msg) {
4968 pr_err("unable to allocate msg type %d len %d\n",
4969 type, front_len);
4970 return NULL;
4971 }
4972
4973 return msg;
4974}
4975
4976static int mds_sign_message(struct ceph_msg *msg)
4977{
4978 struct ceph_mds_session *s = msg->con->private;
4979 struct ceph_auth_handshake *auth = &s->s_auth;
4980
4981 return ceph_auth_sign_message(auth, msg);
4982}
4983
4984static int mds_check_message_signature(struct ceph_msg *msg)
4985{
4986 struct ceph_mds_session *s = msg->con->private;
4987 struct ceph_auth_handshake *auth = &s->s_auth;
4988
4989 return ceph_auth_check_message_signature(auth, msg);
4990}
4991
4992static const struct ceph_connection_operations mds_con_ops = {
4993 .get = con_get,
4994 .put = con_put,
4995 .dispatch = dispatch,
4996 .get_authorizer = get_authorizer,
4997 .add_authorizer_challenge = add_authorizer_challenge,
4998 .verify_authorizer_reply = verify_authorizer_reply,
4999 .invalidate_authorizer = invalidate_authorizer,
5000 .peer_reset = peer_reset,
5001 .alloc_msg = mds_alloc_msg,
5002 .sign_message = mds_sign_message,
5003 .check_message_signature = mds_check_message_signature,
5004};
5005
5006
5007