1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/fs.h>
4#include <linux/wait.h>
5#include <linux/slab.h>
6#include <linux/gfp.h>
7#include <linux/sched.h>
8#include <linux/debugfs.h>
9#include <linux/seq_file.h>
10#include <linux/ratelimit.h>
11
12#include "super.h"
13#include "mds_client.h"
14
15#include <linux/ceph/ceph_features.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/pagelist.h>
19#include <linux/ceph/auth.h>
20#include <linux/ceph/debugfs.h>
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47struct ceph_reconnect_state {
48 int nr_caps;
49 struct ceph_pagelist *pagelist;
50 unsigned msg_version;
51};
52
53static void __wake_requests(struct ceph_mds_client *mdsc,
54 struct list_head *head);
55
56static const struct ceph_connection_operations mds_con_ops;
57
58
59
60
61
62
63
64
65
66static int parse_reply_info_in(void **p, void *end,
67 struct ceph_mds_reply_info_in *info,
68 u64 features)
69{
70 int err = -EIO;
71
72 info->in = *p;
73 *p += sizeof(struct ceph_mds_reply_inode) +
74 sizeof(*info->in->fragtree.splits) *
75 le32_to_cpu(info->in->fragtree.nsplits);
76
77 ceph_decode_32_safe(p, end, info->symlink_len, bad);
78 ceph_decode_need(p, end, info->symlink_len, bad);
79 info->symlink = *p;
80 *p += info->symlink_len;
81
82 if (features & CEPH_FEATURE_DIRLAYOUTHASH)
83 ceph_decode_copy_safe(p, end, &info->dir_layout,
84 sizeof(info->dir_layout), bad);
85 else
86 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
87
88 ceph_decode_32_safe(p, end, info->xattr_len, bad);
89 ceph_decode_need(p, end, info->xattr_len, bad);
90 info->xattr_data = *p;
91 *p += info->xattr_len;
92
93 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
94 ceph_decode_64_safe(p, end, info->inline_version, bad);
95 ceph_decode_32_safe(p, end, info->inline_len, bad);
96 ceph_decode_need(p, end, info->inline_len, bad);
97 info->inline_data = *p;
98 *p += info->inline_len;
99 } else
100 info->inline_version = CEPH_INLINE_NONE;
101
102 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
103 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
104 ceph_decode_need(p, end, info->pool_ns_len, bad);
105 *p += info->pool_ns_len;
106 } else {
107 info->pool_ns_len = 0;
108 }
109
110 return 0;
111bad:
112 return err;
113}
114
115
116
117
118
119static int parse_reply_info_trace(void **p, void *end,
120 struct ceph_mds_reply_info_parsed *info,
121 u64 features)
122{
123 int err;
124
125 if (info->head->is_dentry) {
126 err = parse_reply_info_in(p, end, &info->diri, features);
127 if (err < 0)
128 goto out_bad;
129
130 if (unlikely(*p + sizeof(*info->dirfrag) > end))
131 goto bad;
132 info->dirfrag = *p;
133 *p += sizeof(*info->dirfrag) +
134 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
135 if (unlikely(*p > end))
136 goto bad;
137
138 ceph_decode_32_safe(p, end, info->dname_len, bad);
139 ceph_decode_need(p, end, info->dname_len, bad);
140 info->dname = *p;
141 *p += info->dname_len;
142 info->dlease = *p;
143 *p += sizeof(*info->dlease);
144 }
145
146 if (info->head->is_target) {
147 err = parse_reply_info_in(p, end, &info->targeti, features);
148 if (err < 0)
149 goto out_bad;
150 }
151
152 if (unlikely(*p != end))
153 goto bad;
154 return 0;
155
156bad:
157 err = -EIO;
158out_bad:
159 pr_err("problem parsing mds trace %d\n", err);
160 return err;
161}
162
163
164
165
166static int parse_reply_info_dir(void **p, void *end,
167 struct ceph_mds_reply_info_parsed *info,
168 u64 features)
169{
170 u32 num, i = 0;
171 int err;
172
173 info->dir_dir = *p;
174 if (*p + sizeof(*info->dir_dir) > end)
175 goto bad;
176 *p += sizeof(*info->dir_dir) +
177 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
178 if (*p > end)
179 goto bad;
180
181 ceph_decode_need(p, end, sizeof(num) + 2, bad);
182 num = ceph_decode_32(p);
183 {
184 u16 flags = ceph_decode_16(p);
185 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
186 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
187 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
188 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
189 }
190 if (num == 0)
191 goto done;
192
193 BUG_ON(!info->dir_entries);
194 if ((unsigned long)(info->dir_entries + num) >
195 (unsigned long)info->dir_entries + info->dir_buf_size) {
196 pr_err("dir contents are larger than expected\n");
197 WARN_ON(1);
198 goto bad;
199 }
200
201 info->dir_nr = num;
202 while (num) {
203 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
204
205 ceph_decode_need(p, end, sizeof(u32)*2, bad);
206 rde->name_len = ceph_decode_32(p);
207 ceph_decode_need(p, end, rde->name_len, bad);
208 rde->name = *p;
209 *p += rde->name_len;
210 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
211 rde->lease = *p;
212 *p += sizeof(struct ceph_mds_reply_lease);
213
214
215 err = parse_reply_info_in(p, end, &rde->inode, features);
216 if (err < 0)
217 goto out_bad;
218
219 rde->offset = 0;
220 i++;
221 num--;
222 }
223
224done:
225 if (*p != end)
226 goto bad;
227 return 0;
228
229bad:
230 err = -EIO;
231out_bad:
232 pr_err("problem parsing dir contents %d\n", err);
233 return err;
234}
235
236
237
238
239static int parse_reply_info_filelock(void **p, void *end,
240 struct ceph_mds_reply_info_parsed *info,
241 u64 features)
242{
243 if (*p + sizeof(*info->filelock_reply) > end)
244 goto bad;
245
246 info->filelock_reply = *p;
247 *p += sizeof(*info->filelock_reply);
248
249 if (unlikely(*p != end))
250 goto bad;
251 return 0;
252
253bad:
254 return -EIO;
255}
256
257
258
259
260static int parse_reply_info_create(void **p, void *end,
261 struct ceph_mds_reply_info_parsed *info,
262 u64 features)
263{
264 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
265 if (*p == end) {
266 info->has_create_ino = false;
267 } else {
268 info->has_create_ino = true;
269 info->ino = ceph_decode_64(p);
270 }
271 }
272
273 if (unlikely(*p != end))
274 goto bad;
275 return 0;
276
277bad:
278 return -EIO;
279}
280
281
282
283
284static int parse_reply_info_extra(void **p, void *end,
285 struct ceph_mds_reply_info_parsed *info,
286 u64 features)
287{
288 u32 op = le32_to_cpu(info->head->op);
289
290 if (op == CEPH_MDS_OP_GETFILELOCK)
291 return parse_reply_info_filelock(p, end, info, features);
292 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
293 return parse_reply_info_dir(p, end, info, features);
294 else if (op == CEPH_MDS_OP_CREATE)
295 return parse_reply_info_create(p, end, info, features);
296 else
297 return -EIO;
298}
299
300
301
302
303static int parse_reply_info(struct ceph_msg *msg,
304 struct ceph_mds_reply_info_parsed *info,
305 u64 features)
306{
307 void *p, *end;
308 u32 len;
309 int err;
310
311 info->head = msg->front.iov_base;
312 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
313 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
314
315
316 ceph_decode_32_safe(&p, end, len, bad);
317 if (len > 0) {
318 ceph_decode_need(&p, end, len, bad);
319 err = parse_reply_info_trace(&p, p+len, info, features);
320 if (err < 0)
321 goto out_bad;
322 }
323
324
325 ceph_decode_32_safe(&p, end, len, bad);
326 if (len > 0) {
327 ceph_decode_need(&p, end, len, bad);
328 err = parse_reply_info_extra(&p, p+len, info, features);
329 if (err < 0)
330 goto out_bad;
331 }
332
333
334 ceph_decode_32_safe(&p, end, len, bad);
335 info->snapblob_len = len;
336 info->snapblob = p;
337 p += len;
338
339 if (p != end)
340 goto bad;
341 return 0;
342
343bad:
344 err = -EIO;
345out_bad:
346 pr_err("mds parse_reply err %d\n", err);
347 return err;
348}
349
350static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
351{
352 if (!info->dir_entries)
353 return;
354 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
355}
356
357
358
359
360
361const char *ceph_session_state_name(int s)
362{
363 switch (s) {
364 case CEPH_MDS_SESSION_NEW: return "new";
365 case CEPH_MDS_SESSION_OPENING: return "opening";
366 case CEPH_MDS_SESSION_OPEN: return "open";
367 case CEPH_MDS_SESSION_HUNG: return "hung";
368 case CEPH_MDS_SESSION_CLOSING: return "closing";
369 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
370 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
371 case CEPH_MDS_SESSION_REJECTED: return "rejected";
372 default: return "???";
373 }
374}
375
376static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
377{
378 if (atomic_inc_not_zero(&s->s_ref)) {
379 dout("mdsc get_session %p %d -> %d\n", s,
380 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
381 return s;
382 } else {
383 dout("mdsc get_session %p 0 -- FAIL", s);
384 return NULL;
385 }
386}
387
388void ceph_put_mds_session(struct ceph_mds_session *s)
389{
390 dout("mdsc put_session %p %d -> %d\n", s,
391 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
392 if (atomic_dec_and_test(&s->s_ref)) {
393 if (s->s_auth.authorizer)
394 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
395 kfree(s);
396 }
397}
398
399
400
401
402struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
403 int mds)
404{
405 struct ceph_mds_session *session;
406
407 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
408 return NULL;
409 session = mdsc->sessions[mds];
410 dout("lookup_mds_session %p %d\n", session,
411 atomic_read(&session->s_ref));
412 get_session(session);
413 return session;
414}
415
416static bool __have_session(struct ceph_mds_client *mdsc, int mds)
417{
418 if (mds >= mdsc->max_sessions)
419 return false;
420 return mdsc->sessions[mds];
421}
422
423static int __verify_registered_session(struct ceph_mds_client *mdsc,
424 struct ceph_mds_session *s)
425{
426 if (s->s_mds >= mdsc->max_sessions ||
427 mdsc->sessions[s->s_mds] != s)
428 return -ENOENT;
429 return 0;
430}
431
432
433
434
435
436static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
437 int mds)
438{
439 struct ceph_mds_session *s;
440
441 if (mds >= mdsc->mdsmap->m_num_mds)
442 return ERR_PTR(-EINVAL);
443
444 s = kzalloc(sizeof(*s), GFP_NOFS);
445 if (!s)
446 return ERR_PTR(-ENOMEM);
447 s->s_mdsc = mdsc;
448 s->s_mds = mds;
449 s->s_state = CEPH_MDS_SESSION_NEW;
450 s->s_ttl = 0;
451 s->s_seq = 0;
452 mutex_init(&s->s_mutex);
453
454 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
455
456 spin_lock_init(&s->s_gen_ttl_lock);
457 s->s_cap_gen = 0;
458 s->s_cap_ttl = jiffies - 1;
459
460 spin_lock_init(&s->s_cap_lock);
461 s->s_renew_requested = 0;
462 s->s_renew_seq = 0;
463 INIT_LIST_HEAD(&s->s_caps);
464 s->s_nr_caps = 0;
465 s->s_trim_caps = 0;
466 atomic_set(&s->s_ref, 1);
467 INIT_LIST_HEAD(&s->s_waiting);
468 INIT_LIST_HEAD(&s->s_unsafe);
469 s->s_num_cap_releases = 0;
470 s->s_cap_reconnect = 0;
471 s->s_cap_iterator = NULL;
472 INIT_LIST_HEAD(&s->s_cap_releases);
473 INIT_LIST_HEAD(&s->s_cap_flushing);
474
475 dout("register_session mds%d\n", mds);
476 if (mds >= mdsc->max_sessions) {
477 int newmax = 1 << get_count_order(mds+1);
478 struct ceph_mds_session **sa;
479
480 dout("register_session realloc to %d\n", newmax);
481 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
482 if (!sa)
483 goto fail_realloc;
484 if (mdsc->sessions) {
485 memcpy(sa, mdsc->sessions,
486 mdsc->max_sessions * sizeof(void *));
487 kfree(mdsc->sessions);
488 }
489 mdsc->sessions = sa;
490 mdsc->max_sessions = newmax;
491 }
492 mdsc->sessions[mds] = s;
493 atomic_inc(&mdsc->num_sessions);
494 atomic_inc(&s->s_ref);
495
496 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
497 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
498
499 return s;
500
501fail_realloc:
502 kfree(s);
503 return ERR_PTR(-ENOMEM);
504}
505
506
507
508
509static void __unregister_session(struct ceph_mds_client *mdsc,
510 struct ceph_mds_session *s)
511{
512 dout("__unregister_session mds%d %p\n", s->s_mds, s);
513 BUG_ON(mdsc->sessions[s->s_mds] != s);
514 mdsc->sessions[s->s_mds] = NULL;
515 ceph_con_close(&s->s_con);
516 ceph_put_mds_session(s);
517 atomic_dec(&mdsc->num_sessions);
518}
519
520
521
522
523
524
525static void put_request_session(struct ceph_mds_request *req)
526{
527 if (req->r_session) {
528 ceph_put_mds_session(req->r_session);
529 req->r_session = NULL;
530 }
531}
532
533void ceph_mdsc_release_request(struct kref *kref)
534{
535 struct ceph_mds_request *req = container_of(kref,
536 struct ceph_mds_request,
537 r_kref);
538 destroy_reply_info(&req->r_reply_info);
539 if (req->r_request)
540 ceph_msg_put(req->r_request);
541 if (req->r_reply)
542 ceph_msg_put(req->r_reply);
543 if (req->r_inode) {
544 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
545 iput(req->r_inode);
546 }
547 if (req->r_parent)
548 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
549 iput(req->r_target_inode);
550 if (req->r_dentry)
551 dput(req->r_dentry);
552 if (req->r_old_dentry)
553 dput(req->r_old_dentry);
554 if (req->r_old_dentry_dir) {
555
556
557
558
559
560
561 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
562 CEPH_CAP_PIN);
563 iput(req->r_old_dentry_dir);
564 }
565 kfree(req->r_path1);
566 kfree(req->r_path2);
567 if (req->r_pagelist)
568 ceph_pagelist_release(req->r_pagelist);
569 put_request_session(req);
570 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
571 kfree(req);
572}
573
574DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
575
576
577
578
579
580
581static struct ceph_mds_request *
582lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
583{
584 struct ceph_mds_request *req;
585
586 req = lookup_request(&mdsc->request_tree, tid);
587 if (req)
588 ceph_mdsc_get_request(req);
589
590 return req;
591}
592
593
594
595
596
597
598
599static void __register_request(struct ceph_mds_client *mdsc,
600 struct ceph_mds_request *req,
601 struct inode *dir)
602{
603 req->r_tid = ++mdsc->last_tid;
604 if (req->r_num_caps)
605 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
606 req->r_num_caps);
607 dout("__register_request %p tid %lld\n", req, req->r_tid);
608 ceph_mdsc_get_request(req);
609 insert_request(&mdsc->request_tree, req);
610
611 req->r_uid = current_fsuid();
612 req->r_gid = current_fsgid();
613
614 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
615 mdsc->oldest_tid = req->r_tid;
616
617 if (dir) {
618 ihold(dir);
619 req->r_unsafe_dir = dir;
620 }
621}
622
623static void __unregister_request(struct ceph_mds_client *mdsc,
624 struct ceph_mds_request *req)
625{
626 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
627
628
629 list_del_init(&req->r_unsafe_item);
630
631 if (req->r_tid == mdsc->oldest_tid) {
632 struct rb_node *p = rb_next(&req->r_node);
633 mdsc->oldest_tid = 0;
634 while (p) {
635 struct ceph_mds_request *next_req =
636 rb_entry(p, struct ceph_mds_request, r_node);
637 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
638 mdsc->oldest_tid = next_req->r_tid;
639 break;
640 }
641 p = rb_next(p);
642 }
643 }
644
645 erase_request(&mdsc->request_tree, req);
646
647 if (req->r_unsafe_dir &&
648 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
649 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
650 spin_lock(&ci->i_unsafe_lock);
651 list_del_init(&req->r_unsafe_dir_item);
652 spin_unlock(&ci->i_unsafe_lock);
653 }
654 if (req->r_target_inode &&
655 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
656 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
657 spin_lock(&ci->i_unsafe_lock);
658 list_del_init(&req->r_unsafe_target_item);
659 spin_unlock(&ci->i_unsafe_lock);
660 }
661
662 if (req->r_unsafe_dir) {
663 iput(req->r_unsafe_dir);
664 req->r_unsafe_dir = NULL;
665 }
666
667 complete_all(&req->r_safe_completion);
668
669 ceph_mdsc_put_request(req);
670}
671
672
673
674
675
676
677
678
679static struct inode *get_nonsnap_parent(struct dentry *dentry)
680{
681 struct inode *inode = NULL;
682
683 while (dentry && !IS_ROOT(dentry)) {
684 inode = d_inode_rcu(dentry);
685 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
686 break;
687 dentry = dentry->d_parent;
688 }
689 if (inode)
690 inode = igrab(inode);
691 return inode;
692}
693
694
695
696
697
698
699
700
701
702static int __choose_mds(struct ceph_mds_client *mdsc,
703 struct ceph_mds_request *req)
704{
705 struct inode *inode;
706 struct ceph_inode_info *ci;
707 struct ceph_cap *cap;
708 int mode = req->r_direct_mode;
709 int mds = -1;
710 u32 hash = req->r_direct_hash;
711 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
712
713
714
715
716
717 if (req->r_resend_mds >= 0 &&
718 (__have_session(mdsc, req->r_resend_mds) ||
719 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
720 dout("choose_mds using resend_mds mds%d\n",
721 req->r_resend_mds);
722 return req->r_resend_mds;
723 }
724
725 if (mode == USE_RANDOM_MDS)
726 goto random;
727
728 inode = NULL;
729 if (req->r_inode) {
730 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
731 inode = req->r_inode;
732 ihold(inode);
733 } else {
734
735 rcu_read_lock();
736 inode = get_nonsnap_parent(req->r_dentry);
737 rcu_read_unlock();
738 dout("__choose_mds using snapdir's parent %p\n", inode);
739 }
740 } else if (req->r_dentry) {
741
742 struct dentry *parent;
743 struct inode *dir;
744
745 rcu_read_lock();
746 parent = req->r_dentry->d_parent;
747 dir = req->r_parent ? : d_inode_rcu(parent);
748
749 if (!dir || dir->i_sb != mdsc->fsc->sb) {
750
751 inode = d_inode(req->r_dentry);
752 if (inode)
753 ihold(inode);
754 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
755
756
757 inode = get_nonsnap_parent(parent);
758 dout("__choose_mds using nonsnap parent %p\n", inode);
759 } else {
760
761 inode = req->r_dentry->d_inode;
762 if (!inode || mode == USE_AUTH_MDS) {
763
764 inode = igrab(dir);
765 hash = ceph_dentry_hash(dir, req->r_dentry);
766 is_hash = true;
767 } else {
768 ihold(inode);
769 }
770 }
771 rcu_read_unlock();
772 }
773
774 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
775 (int)hash, mode);
776 if (!inode)
777 goto random;
778 ci = ceph_inode(inode);
779
780 if (is_hash && S_ISDIR(inode->i_mode)) {
781 struct ceph_inode_frag frag;
782 int found;
783
784 ceph_choose_frag(ci, hash, &frag, &found);
785 if (found) {
786 if (mode == USE_ANY_MDS && frag.ndist > 0) {
787 u8 r;
788
789
790 get_random_bytes(&r, 1);
791 r %= frag.ndist;
792 mds = frag.dist[r];
793 dout("choose_mds %p %llx.%llx "
794 "frag %u mds%d (%d/%d)\n",
795 inode, ceph_vinop(inode),
796 frag.frag, mds,
797 (int)r, frag.ndist);
798 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
799 CEPH_MDS_STATE_ACTIVE)
800 goto out;
801 }
802
803
804
805
806 mode = USE_AUTH_MDS;
807 if (frag.mds >= 0) {
808
809 mds = frag.mds;
810 dout("choose_mds %p %llx.%llx "
811 "frag %u mds%d (auth)\n",
812 inode, ceph_vinop(inode), frag.frag, mds);
813 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
814 CEPH_MDS_STATE_ACTIVE)
815 goto out;
816 }
817 }
818 }
819
820 spin_lock(&ci->i_ceph_lock);
821 cap = NULL;
822 if (mode == USE_AUTH_MDS)
823 cap = ci->i_auth_cap;
824 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
825 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
826 if (!cap) {
827 spin_unlock(&ci->i_ceph_lock);
828 iput(inode);
829 goto random;
830 }
831 mds = cap->session->s_mds;
832 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
833 inode, ceph_vinop(inode), mds,
834 cap == ci->i_auth_cap ? "auth " : "", cap);
835 spin_unlock(&ci->i_ceph_lock);
836out:
837 iput(inode);
838 return mds;
839
840random:
841 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
842 dout("choose_mds chose random mds%d\n", mds);
843 return mds;
844}
845
846
847
848
849
850static struct ceph_msg *create_session_msg(u32 op, u64 seq)
851{
852 struct ceph_msg *msg;
853 struct ceph_mds_session_head *h;
854
855 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
856 false);
857 if (!msg) {
858 pr_err("create_session_msg ENOMEM creating msg\n");
859 return NULL;
860 }
861 h = msg->front.iov_base;
862 h->op = cpu_to_le32(op);
863 h->seq = cpu_to_le64(seq);
864
865 return msg;
866}
867
868
869
870
871
872static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
873{
874 struct ceph_msg *msg;
875 struct ceph_mds_session_head *h;
876 int i = -1;
877 int metadata_bytes = 0;
878 int metadata_key_count = 0;
879 struct ceph_options *opt = mdsc->fsc->client->options;
880 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
881 void *p;
882
883 const char* metadata[][2] = {
884 {"hostname", mdsc->nodename},
885 {"kernel_version", init_utsname()->release},
886 {"entity_id", opt->name ? : ""},
887 {"root", fsopt->server_path ? : "/"},
888 {NULL, NULL}
889 };
890
891
892 metadata_bytes = 4;
893 for (i = 0; metadata[i][0]; ++i) {
894 metadata_bytes += 8 + strlen(metadata[i][0]) +
895 strlen(metadata[i][1]);
896 metadata_key_count++;
897 }
898
899
900 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
901 GFP_NOFS, false);
902 if (!msg) {
903 pr_err("create_session_msg ENOMEM creating msg\n");
904 return NULL;
905 }
906 h = msg->front.iov_base;
907 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
908 h->seq = cpu_to_le64(seq);
909
910
911
912
913
914
915
916 msg->hdr.version = cpu_to_le16(2);
917 msg->hdr.compat_version = cpu_to_le16(1);
918
919
920 p = msg->front.iov_base + sizeof(*h);
921
922
923 ceph_encode_32(&p, metadata_key_count);
924
925
926 for (i = 0; metadata[i][0]; ++i) {
927 size_t const key_len = strlen(metadata[i][0]);
928 size_t const val_len = strlen(metadata[i][1]);
929
930 ceph_encode_32(&p, key_len);
931 memcpy(p, metadata[i][0], key_len);
932 p += key_len;
933 ceph_encode_32(&p, val_len);
934 memcpy(p, metadata[i][1], val_len);
935 p += val_len;
936 }
937
938 return msg;
939}
940
941
942
943
944
945
946static int __open_session(struct ceph_mds_client *mdsc,
947 struct ceph_mds_session *session)
948{
949 struct ceph_msg *msg;
950 int mstate;
951 int mds = session->s_mds;
952
953
954 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
955 dout("open_session to mds%d (%s)\n", mds,
956 ceph_mds_state_name(mstate));
957 session->s_state = CEPH_MDS_SESSION_OPENING;
958 session->s_renew_requested = jiffies;
959
960
961 msg = create_session_open_msg(mdsc, session->s_seq);
962 if (!msg)
963 return -ENOMEM;
964 ceph_con_send(&session->s_con, msg);
965 return 0;
966}
967
968
969
970
971
972
973static struct ceph_mds_session *
974__open_export_target_session(struct ceph_mds_client *mdsc, int target)
975{
976 struct ceph_mds_session *session;
977
978 session = __ceph_lookup_mds_session(mdsc, target);
979 if (!session) {
980 session = register_session(mdsc, target);
981 if (IS_ERR(session))
982 return session;
983 }
984 if (session->s_state == CEPH_MDS_SESSION_NEW ||
985 session->s_state == CEPH_MDS_SESSION_CLOSING)
986 __open_session(mdsc, session);
987
988 return session;
989}
990
991struct ceph_mds_session *
992ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
993{
994 struct ceph_mds_session *session;
995
996 dout("open_export_target_session to mds%d\n", target);
997
998 mutex_lock(&mdsc->mutex);
999 session = __open_export_target_session(mdsc, target);
1000 mutex_unlock(&mdsc->mutex);
1001
1002 return session;
1003}
1004
1005static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1006 struct ceph_mds_session *session)
1007{
1008 struct ceph_mds_info *mi;
1009 struct ceph_mds_session *ts;
1010 int i, mds = session->s_mds;
1011
1012 if (mds >= mdsc->mdsmap->m_num_mds)
1013 return;
1014
1015 mi = &mdsc->mdsmap->m_info[mds];
1016 dout("open_export_target_sessions for mds%d (%d targets)\n",
1017 session->s_mds, mi->num_export_targets);
1018
1019 for (i = 0; i < mi->num_export_targets; i++) {
1020 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1021 if (!IS_ERR(ts))
1022 ceph_put_mds_session(ts);
1023 }
1024}
1025
1026void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1027 struct ceph_mds_session *session)
1028{
1029 mutex_lock(&mdsc->mutex);
1030 __open_export_target_sessions(mdsc, session);
1031 mutex_unlock(&mdsc->mutex);
1032}
1033
1034
1035
1036
1037
1038
1039static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1040 struct ceph_mds_session *session)
1041 __releases(session->s_cap_lock)
1042{
1043 LIST_HEAD(tmp_list);
1044 list_splice_init(&session->s_cap_releases, &tmp_list);
1045 session->s_num_cap_releases = 0;
1046 spin_unlock(&session->s_cap_lock);
1047
1048 dout("cleanup_cap_releases mds%d\n", session->s_mds);
1049 while (!list_empty(&tmp_list)) {
1050 struct ceph_cap *cap;
1051
1052 cap = list_first_entry(&tmp_list,
1053 struct ceph_cap, session_caps);
1054 list_del(&cap->session_caps);
1055 ceph_put_cap(mdsc, cap);
1056 }
1057}
1058
1059static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1060 struct ceph_mds_session *session)
1061{
1062 struct ceph_mds_request *req;
1063 struct rb_node *p;
1064
1065 dout("cleanup_session_requests mds%d\n", session->s_mds);
1066 mutex_lock(&mdsc->mutex);
1067 while (!list_empty(&session->s_unsafe)) {
1068 req = list_first_entry(&session->s_unsafe,
1069 struct ceph_mds_request, r_unsafe_item);
1070 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1071 req->r_tid);
1072 __unregister_request(mdsc, req);
1073 }
1074
1075 p = rb_first(&mdsc->request_tree);
1076 while (p) {
1077 req = rb_entry(p, struct ceph_mds_request, r_node);
1078 p = rb_next(p);
1079 if (req->r_session &&
1080 req->r_session->s_mds == session->s_mds)
1081 req->r_attempts = 0;
1082 }
1083 mutex_unlock(&mdsc->mutex);
1084}
1085
1086
1087
1088
1089
1090
1091
1092static int iterate_session_caps(struct ceph_mds_session *session,
1093 int (*cb)(struct inode *, struct ceph_cap *,
1094 void *), void *arg)
1095{
1096 struct list_head *p;
1097 struct ceph_cap *cap;
1098 struct inode *inode, *last_inode = NULL;
1099 struct ceph_cap *old_cap = NULL;
1100 int ret;
1101
1102 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1103 spin_lock(&session->s_cap_lock);
1104 p = session->s_caps.next;
1105 while (p != &session->s_caps) {
1106 cap = list_entry(p, struct ceph_cap, session_caps);
1107 inode = igrab(&cap->ci->vfs_inode);
1108 if (!inode) {
1109 p = p->next;
1110 continue;
1111 }
1112 session->s_cap_iterator = cap;
1113 spin_unlock(&session->s_cap_lock);
1114
1115 if (last_inode) {
1116 iput(last_inode);
1117 last_inode = NULL;
1118 }
1119 if (old_cap) {
1120 ceph_put_cap(session->s_mdsc, old_cap);
1121 old_cap = NULL;
1122 }
1123
1124 ret = cb(inode, cap, arg);
1125 last_inode = inode;
1126
1127 spin_lock(&session->s_cap_lock);
1128 p = p->next;
1129 if (!cap->ci) {
1130 dout("iterate_session_caps finishing cap %p removal\n",
1131 cap);
1132 BUG_ON(cap->session != session);
1133 cap->session = NULL;
1134 list_del_init(&cap->session_caps);
1135 session->s_nr_caps--;
1136 if (cap->queue_release) {
1137 list_add_tail(&cap->session_caps,
1138 &session->s_cap_releases);
1139 session->s_num_cap_releases++;
1140 } else {
1141 old_cap = cap;
1142 }
1143 }
1144 if (ret < 0)
1145 goto out;
1146 }
1147 ret = 0;
1148out:
1149 session->s_cap_iterator = NULL;
1150 spin_unlock(&session->s_cap_lock);
1151
1152 iput(last_inode);
1153 if (old_cap)
1154 ceph_put_cap(session->s_mdsc, old_cap);
1155
1156 return ret;
1157}
1158
1159static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1160 void *arg)
1161{
1162 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1163 struct ceph_inode_info *ci = ceph_inode(inode);
1164 LIST_HEAD(to_remove);
1165 bool drop = false;
1166 bool invalidate = false;
1167
1168 dout("removing cap %p, ci is %p, inode is %p\n",
1169 cap, ci, &ci->vfs_inode);
1170 spin_lock(&ci->i_ceph_lock);
1171 __ceph_remove_cap(cap, false);
1172 if (!ci->i_auth_cap) {
1173 struct ceph_cap_flush *cf;
1174 struct ceph_mds_client *mdsc = fsc->mdsc;
1175
1176 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1177
1178 if (ci->i_wrbuffer_ref > 0 &&
1179 ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1180 invalidate = true;
1181
1182 while (!list_empty(&ci->i_cap_flush_list)) {
1183 cf = list_first_entry(&ci->i_cap_flush_list,
1184 struct ceph_cap_flush, i_list);
1185 list_move(&cf->i_list, &to_remove);
1186 }
1187
1188 spin_lock(&mdsc->cap_dirty_lock);
1189
1190 list_for_each_entry(cf, &to_remove, i_list)
1191 list_del(&cf->g_list);
1192
1193 if (!list_empty(&ci->i_dirty_item)) {
1194 pr_warn_ratelimited(
1195 " dropping dirty %s state for %p %lld\n",
1196 ceph_cap_string(ci->i_dirty_caps),
1197 inode, ceph_ino(inode));
1198 ci->i_dirty_caps = 0;
1199 list_del_init(&ci->i_dirty_item);
1200 drop = true;
1201 }
1202 if (!list_empty(&ci->i_flushing_item)) {
1203 pr_warn_ratelimited(
1204 " dropping dirty+flushing %s state for %p %lld\n",
1205 ceph_cap_string(ci->i_flushing_caps),
1206 inode, ceph_ino(inode));
1207 ci->i_flushing_caps = 0;
1208 list_del_init(&ci->i_flushing_item);
1209 mdsc->num_cap_flushing--;
1210 drop = true;
1211 }
1212 spin_unlock(&mdsc->cap_dirty_lock);
1213
1214 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1215 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1216 ci->i_prealloc_cap_flush = NULL;
1217 }
1218 }
1219 spin_unlock(&ci->i_ceph_lock);
1220 while (!list_empty(&to_remove)) {
1221 struct ceph_cap_flush *cf;
1222 cf = list_first_entry(&to_remove,
1223 struct ceph_cap_flush, i_list);
1224 list_del(&cf->i_list);
1225 ceph_free_cap_flush(cf);
1226 }
1227
1228 wake_up_all(&ci->i_cap_wq);
1229 if (invalidate)
1230 ceph_queue_invalidate(inode);
1231 if (drop)
1232 iput(inode);
1233 return 0;
1234}
1235
1236
1237
1238
1239static void remove_session_caps(struct ceph_mds_session *session)
1240{
1241 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1242 struct super_block *sb = fsc->sb;
1243 dout("remove_session_caps on %p\n", session);
1244 iterate_session_caps(session, remove_session_caps_cb, fsc);
1245
1246 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1247
1248 spin_lock(&session->s_cap_lock);
1249 if (session->s_nr_caps > 0) {
1250 struct inode *inode;
1251 struct ceph_cap *cap, *prev = NULL;
1252 struct ceph_vino vino;
1253
1254
1255
1256
1257
1258
1259
1260 while (!list_empty(&session->s_caps)) {
1261 cap = list_entry(session->s_caps.next,
1262 struct ceph_cap, session_caps);
1263 if (cap == prev)
1264 break;
1265 prev = cap;
1266 vino = cap->ci->i_vino;
1267 spin_unlock(&session->s_cap_lock);
1268
1269 inode = ceph_find_inode(sb, vino);
1270 iput(inode);
1271
1272 spin_lock(&session->s_cap_lock);
1273 }
1274 }
1275
1276
1277 cleanup_cap_releases(session->s_mdsc, session);
1278
1279 BUG_ON(session->s_nr_caps > 0);
1280 BUG_ON(!list_empty(&session->s_cap_flushing));
1281}
1282
1283
1284
1285
1286
1287
1288
1289static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1290 void *arg)
1291{
1292 struct ceph_inode_info *ci = ceph_inode(inode);
1293
1294 if (arg) {
1295 spin_lock(&ci->i_ceph_lock);
1296 ci->i_wanted_max_size = 0;
1297 ci->i_requested_max_size = 0;
1298 spin_unlock(&ci->i_ceph_lock);
1299 }
1300 wake_up_all(&ci->i_cap_wq);
1301 return 0;
1302}
1303
1304static void wake_up_session_caps(struct ceph_mds_session *session,
1305 int reconnect)
1306{
1307 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1308 iterate_session_caps(session, wake_up_session_cb,
1309 (void *)(unsigned long)reconnect);
1310}
1311
1312
1313
1314
1315
1316
1317
1318static int send_renew_caps(struct ceph_mds_client *mdsc,
1319 struct ceph_mds_session *session)
1320{
1321 struct ceph_msg *msg;
1322 int state;
1323
1324 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1325 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1326 pr_info("mds%d caps stale\n", session->s_mds);
1327 session->s_renew_requested = jiffies;
1328
1329
1330
1331 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1332 if (state < CEPH_MDS_STATE_RECONNECT) {
1333 dout("send_renew_caps ignoring mds%d (%s)\n",
1334 session->s_mds, ceph_mds_state_name(state));
1335 return 0;
1336 }
1337
1338 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1339 ceph_mds_state_name(state));
1340 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1341 ++session->s_renew_seq);
1342 if (!msg)
1343 return -ENOMEM;
1344 ceph_con_send(&session->s_con, msg);
1345 return 0;
1346}
1347
1348static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1349 struct ceph_mds_session *session, u64 seq)
1350{
1351 struct ceph_msg *msg;
1352
1353 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1354 session->s_mds, ceph_session_state_name(session->s_state), seq);
1355 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1356 if (!msg)
1357 return -ENOMEM;
1358 ceph_con_send(&session->s_con, msg);
1359 return 0;
1360}
1361
1362
1363
1364
1365
1366
1367
1368static void renewed_caps(struct ceph_mds_client *mdsc,
1369 struct ceph_mds_session *session, int is_renew)
1370{
1371 int was_stale;
1372 int wake = 0;
1373
1374 spin_lock(&session->s_cap_lock);
1375 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1376
1377 session->s_cap_ttl = session->s_renew_requested +
1378 mdsc->mdsmap->m_session_timeout*HZ;
1379
1380 if (was_stale) {
1381 if (time_before(jiffies, session->s_cap_ttl)) {
1382 pr_info("mds%d caps renewed\n", session->s_mds);
1383 wake = 1;
1384 } else {
1385 pr_info("mds%d caps still stale\n", session->s_mds);
1386 }
1387 }
1388 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1389 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1390 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1391 spin_unlock(&session->s_cap_lock);
1392
1393 if (wake)
1394 wake_up_session_caps(session, 0);
1395}
1396
1397
1398
1399
1400static int request_close_session(struct ceph_mds_client *mdsc,
1401 struct ceph_mds_session *session)
1402{
1403 struct ceph_msg *msg;
1404
1405 dout("request_close_session mds%d state %s seq %lld\n",
1406 session->s_mds, ceph_session_state_name(session->s_state),
1407 session->s_seq);
1408 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1409 if (!msg)
1410 return -ENOMEM;
1411 ceph_con_send(&session->s_con, msg);
1412 return 1;
1413}
1414
1415
1416
1417
1418static int __close_session(struct ceph_mds_client *mdsc,
1419 struct ceph_mds_session *session)
1420{
1421 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1422 return 0;
1423 session->s_state = CEPH_MDS_SESSION_CLOSING;
1424 return request_close_session(mdsc, session);
1425}
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1438{
1439 struct ceph_mds_session *session = arg;
1440 struct ceph_inode_info *ci = ceph_inode(inode);
1441 int used, wanted, oissued, mine;
1442
1443 if (session->s_trim_caps <= 0)
1444 return -1;
1445
1446 spin_lock(&ci->i_ceph_lock);
1447 mine = cap->issued | cap->implemented;
1448 used = __ceph_caps_used(ci);
1449 wanted = __ceph_caps_file_wanted(ci);
1450 oissued = __ceph_caps_issued_other(ci, cap);
1451
1452 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1453 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1454 ceph_cap_string(used), ceph_cap_string(wanted));
1455 if (cap == ci->i_auth_cap) {
1456 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1457 !list_empty(&ci->i_cap_snaps))
1458 goto out;
1459 if ((used | wanted) & CEPH_CAP_ANY_WR)
1460 goto out;
1461 }
1462
1463
1464 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1465 !(oissued & CEPH_CAP_FILE_CACHE)) {
1466 used = 0;
1467 oissued = 0;
1468 }
1469 if ((used | wanted) & ~oissued & mine)
1470 goto out;
1471
1472 session->s_trim_caps--;
1473 if (oissued) {
1474
1475 __ceph_remove_cap(cap, true);
1476 } else {
1477
1478 spin_unlock(&ci->i_ceph_lock);
1479 d_prune_aliases(inode);
1480 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1481 inode, cap, atomic_read(&inode->i_count));
1482 return 0;
1483 }
1484
1485out:
1486 spin_unlock(&ci->i_ceph_lock);
1487 return 0;
1488}
1489
1490
1491
1492
1493static int trim_caps(struct ceph_mds_client *mdsc,
1494 struct ceph_mds_session *session,
1495 int max_caps)
1496{
1497 int trim_caps = session->s_nr_caps - max_caps;
1498
1499 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1500 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1501 if (trim_caps > 0) {
1502 session->s_trim_caps = trim_caps;
1503 iterate_session_caps(session, trim_caps_cb, session);
1504 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1505 session->s_mds, session->s_nr_caps, max_caps,
1506 trim_caps - session->s_trim_caps);
1507 session->s_trim_caps = 0;
1508 }
1509
1510 ceph_send_cap_releases(mdsc, session);
1511 return 0;
1512}
1513
1514static int check_caps_flush(struct ceph_mds_client *mdsc,
1515 u64 want_flush_tid)
1516{
1517 int ret = 1;
1518
1519 spin_lock(&mdsc->cap_dirty_lock);
1520 if (!list_empty(&mdsc->cap_flush_list)) {
1521 struct ceph_cap_flush *cf =
1522 list_first_entry(&mdsc->cap_flush_list,
1523 struct ceph_cap_flush, g_list);
1524 if (cf->tid <= want_flush_tid) {
1525 dout("check_caps_flush still flushing tid "
1526 "%llu <= %llu\n", cf->tid, want_flush_tid);
1527 ret = 0;
1528 }
1529 }
1530 spin_unlock(&mdsc->cap_dirty_lock);
1531 return ret;
1532}
1533
1534
1535
1536
1537
1538
1539static void wait_caps_flush(struct ceph_mds_client *mdsc,
1540 u64 want_flush_tid)
1541{
1542 dout("check_caps_flush want %llu\n", want_flush_tid);
1543
1544 wait_event(mdsc->cap_flushing_wq,
1545 check_caps_flush(mdsc, want_flush_tid));
1546
1547 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1548}
1549
1550
1551
1552
1553void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1554 struct ceph_mds_session *session)
1555{
1556 struct ceph_msg *msg = NULL;
1557 struct ceph_mds_cap_release *head;
1558 struct ceph_mds_cap_item *item;
1559 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1560 struct ceph_cap *cap;
1561 LIST_HEAD(tmp_list);
1562 int num_cap_releases;
1563 __le32 barrier, *cap_barrier;
1564
1565 down_read(&osdc->lock);
1566 barrier = cpu_to_le32(osdc->epoch_barrier);
1567 up_read(&osdc->lock);
1568
1569 spin_lock(&session->s_cap_lock);
1570again:
1571 list_splice_init(&session->s_cap_releases, &tmp_list);
1572 num_cap_releases = session->s_num_cap_releases;
1573 session->s_num_cap_releases = 0;
1574 spin_unlock(&session->s_cap_lock);
1575
1576 while (!list_empty(&tmp_list)) {
1577 if (!msg) {
1578 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1579 PAGE_CACHE_SIZE, GFP_NOFS, false);
1580 if (!msg)
1581 goto out_err;
1582 head = msg->front.iov_base;
1583 head->num = cpu_to_le32(0);
1584 msg->front.iov_len = sizeof(*head);
1585
1586 msg->hdr.version = cpu_to_le16(2);
1587 msg->hdr.compat_version = cpu_to_le16(1);
1588 }
1589
1590 cap = list_first_entry(&tmp_list, struct ceph_cap,
1591 session_caps);
1592 list_del(&cap->session_caps);
1593 num_cap_releases--;
1594
1595 head = msg->front.iov_base;
1596 le32_add_cpu(&head->num, 1);
1597 item = msg->front.iov_base + msg->front.iov_len;
1598 item->ino = cpu_to_le64(cap->cap_ino);
1599 item->cap_id = cpu_to_le64(cap->cap_id);
1600 item->migrate_seq = cpu_to_le32(cap->mseq);
1601 item->seq = cpu_to_le32(cap->issue_seq);
1602 msg->front.iov_len += sizeof(*item);
1603
1604 ceph_put_cap(mdsc, cap);
1605
1606 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1607
1608 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1609 *cap_barrier = barrier;
1610 msg->front.iov_len += sizeof(*cap_barrier);
1611
1612 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1613 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1614 ceph_con_send(&session->s_con, msg);
1615 msg = NULL;
1616 }
1617 }
1618
1619 BUG_ON(num_cap_releases != 0);
1620
1621 spin_lock(&session->s_cap_lock);
1622 if (!list_empty(&session->s_cap_releases))
1623 goto again;
1624 spin_unlock(&session->s_cap_lock);
1625
1626 if (msg) {
1627
1628 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1629 *cap_barrier = barrier;
1630 msg->front.iov_len += sizeof(*cap_barrier);
1631
1632 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1633 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1634 ceph_con_send(&session->s_con, msg);
1635 }
1636 return;
1637out_err:
1638 pr_err("send_cap_releases mds%d, failed to allocate message\n",
1639 session->s_mds);
1640 spin_lock(&session->s_cap_lock);
1641 list_splice(&tmp_list, &session->s_cap_releases);
1642 session->s_num_cap_releases += num_cap_releases;
1643 spin_unlock(&session->s_cap_lock);
1644}
1645
1646
1647
1648
1649
1650int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1651 struct inode *dir)
1652{
1653 struct ceph_inode_info *ci = ceph_inode(dir);
1654 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1655 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1656 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1657 int order, num_entries;
1658
1659 spin_lock(&ci->i_ceph_lock);
1660 num_entries = ci->i_files + ci->i_subdirs;
1661 spin_unlock(&ci->i_ceph_lock);
1662 num_entries = max(num_entries, 1);
1663 num_entries = min(num_entries, opt->max_readdir);
1664
1665 order = get_order(size * num_entries);
1666 while (order >= 0) {
1667 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1668 __GFP_NOWARN,
1669 order);
1670 if (rinfo->dir_entries)
1671 break;
1672 order--;
1673 }
1674 if (!rinfo->dir_entries)
1675 return -ENOMEM;
1676
1677 num_entries = (PAGE_SIZE << order) / size;
1678 num_entries = min(num_entries, opt->max_readdir);
1679
1680 rinfo->dir_buf_size = PAGE_SIZE << order;
1681 req->r_num_caps = num_entries + 1;
1682 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1683 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1684 return 0;
1685}
1686
1687
1688
1689
1690struct ceph_mds_request *
1691ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1692{
1693 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1694
1695 if (!req)
1696 return ERR_PTR(-ENOMEM);
1697
1698 mutex_init(&req->r_fill_mutex);
1699 req->r_mdsc = mdsc;
1700 req->r_started = jiffies;
1701 req->r_resend_mds = -1;
1702 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1703 INIT_LIST_HEAD(&req->r_unsafe_target_item);
1704 req->r_fmode = -1;
1705 kref_init(&req->r_kref);
1706 RB_CLEAR_NODE(&req->r_node);
1707 INIT_LIST_HEAD(&req->r_wait);
1708 init_completion(&req->r_completion);
1709 init_completion(&req->r_safe_completion);
1710 INIT_LIST_HEAD(&req->r_unsafe_item);
1711
1712 req->r_stamp = current_fs_time(mdsc->fsc->sb);
1713
1714 req->r_op = op;
1715 req->r_direct_mode = mode;
1716 return req;
1717}
1718
1719
1720
1721
1722
1723
1724static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1725{
1726 if (RB_EMPTY_ROOT(&mdsc->request_tree))
1727 return NULL;
1728 return rb_entry(rb_first(&mdsc->request_tree),
1729 struct ceph_mds_request, r_node);
1730}
1731
1732static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1733{
1734 return mdsc->oldest_tid;
1735}
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1748 int stop_on_nosnap)
1749{
1750 struct dentry *temp;
1751 char *path;
1752 int len, pos;
1753 unsigned seq;
1754
1755 if (!dentry)
1756 return ERR_PTR(-EINVAL);
1757
1758retry:
1759 len = 0;
1760 seq = read_seqbegin(&rename_lock);
1761 rcu_read_lock();
1762 for (temp = dentry; !IS_ROOT(temp);) {
1763 struct inode *inode = temp->d_inode;
1764 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1765 len++;
1766 else if (stop_on_nosnap && inode &&
1767 ceph_snap(inode) == CEPH_NOSNAP)
1768 break;
1769 else
1770 len += 1 + temp->d_name.len;
1771 temp = temp->d_parent;
1772 }
1773 rcu_read_unlock();
1774 if (len)
1775 len--;
1776
1777 path = kmalloc(len+1, GFP_NOFS);
1778 if (!path)
1779 return ERR_PTR(-ENOMEM);
1780 pos = len;
1781 path[pos] = 0;
1782 rcu_read_lock();
1783 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1784 struct inode *inode;
1785
1786 spin_lock(&temp->d_lock);
1787 inode = temp->d_inode;
1788 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1789 dout("build_path path+%d: %p SNAPDIR\n",
1790 pos, temp);
1791 } else if (stop_on_nosnap && inode &&
1792 ceph_snap(inode) == CEPH_NOSNAP) {
1793 spin_unlock(&temp->d_lock);
1794 break;
1795 } else {
1796 pos -= temp->d_name.len;
1797 if (pos < 0) {
1798 spin_unlock(&temp->d_lock);
1799 break;
1800 }
1801 strncpy(path + pos, temp->d_name.name,
1802 temp->d_name.len);
1803 }
1804 spin_unlock(&temp->d_lock);
1805 if (pos)
1806 path[--pos] = '/';
1807 temp = temp->d_parent;
1808 }
1809 rcu_read_unlock();
1810 if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1811 pr_err("build_path did not end path lookup where "
1812 "expected, namelen is %d, pos is %d\n", len, pos);
1813
1814
1815
1816
1817 kfree(path);
1818 goto retry;
1819 }
1820
1821 *base = ceph_ino(temp->d_inode);
1822 *plen = len;
1823 dout("build_path on %p %d built %llx '%.*s'\n",
1824 dentry, d_count(dentry), *base, len, path);
1825 return path;
1826}
1827
1828static int build_dentry_path(struct dentry *dentry, struct inode *dir,
1829 const char **ppath, int *ppathlen, u64 *pino,
1830 int *pfreepath)
1831{
1832 char *path;
1833
1834 rcu_read_lock();
1835 if (!dir)
1836 dir = d_inode_rcu(dentry->d_parent);
1837 if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
1838 *pino = ceph_ino(dir);
1839 rcu_read_unlock();
1840 *ppath = dentry->d_name.name;
1841 *ppathlen = dentry->d_name.len;
1842 return 0;
1843 }
1844 rcu_read_unlock();
1845 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1846 if (IS_ERR(path))
1847 return PTR_ERR(path);
1848 *ppath = path;
1849 *pfreepath = 1;
1850 return 0;
1851}
1852
1853static int build_inode_path(struct inode *inode,
1854 const char **ppath, int *ppathlen, u64 *pino,
1855 int *pfreepath)
1856{
1857 struct dentry *dentry;
1858 char *path;
1859
1860 if (ceph_snap(inode) == CEPH_NOSNAP) {
1861 *pino = ceph_ino(inode);
1862 *ppathlen = 0;
1863 return 0;
1864 }
1865 dentry = d_find_alias(inode);
1866 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1867 dput(dentry);
1868 if (IS_ERR(path))
1869 return PTR_ERR(path);
1870 *ppath = path;
1871 *pfreepath = 1;
1872 return 0;
1873}
1874
1875
1876
1877
1878
1879static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1880 struct inode *rdiri, const char *rpath,
1881 u64 rino, const char **ppath, int *pathlen,
1882 u64 *ino, int *freepath)
1883{
1884 int r = 0;
1885
1886 if (rinode) {
1887 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1888 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1889 ceph_snap(rinode));
1890 } else if (rdentry) {
1891 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1892 freepath);
1893 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1894 *ppath);
1895 } else if (rpath || rino) {
1896 *ino = rino;
1897 *ppath = rpath;
1898 *pathlen = rpath ? strlen(rpath) : 0;
1899 dout(" path %.*s\n", *pathlen, rpath);
1900 }
1901
1902 return r;
1903}
1904
1905
1906
1907
1908static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1909 struct ceph_mds_request *req,
1910 int mds, bool drop_cap_releases)
1911{
1912 struct ceph_msg *msg;
1913 struct ceph_mds_request_head *head;
1914 const char *path1 = NULL;
1915 const char *path2 = NULL;
1916 u64 ino1 = 0, ino2 = 0;
1917 int pathlen1 = 0, pathlen2 = 0;
1918 int freepath1 = 0, freepath2 = 0;
1919 int len;
1920 u16 releases;
1921 void *p, *end;
1922 int ret;
1923
1924 ret = set_request_path_attr(req->r_inode, req->r_dentry,
1925 req->r_parent, req->r_path1, req->r_ino1.ino,
1926 &path1, &pathlen1, &ino1, &freepath1);
1927 if (ret < 0) {
1928 msg = ERR_PTR(ret);
1929 goto out;
1930 }
1931
1932 ret = set_request_path_attr(NULL, req->r_old_dentry,
1933 req->r_old_dentry_dir,
1934 req->r_path2, req->r_ino2.ino,
1935 &path2, &pathlen2, &ino2, &freepath2);
1936 if (ret < 0) {
1937 msg = ERR_PTR(ret);
1938 goto out_free1;
1939 }
1940
1941 len = sizeof(*head) +
1942 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1943 sizeof(struct ceph_timespec);
1944
1945
1946 len += sizeof(struct ceph_mds_request_release) *
1947 (!!req->r_inode_drop + !!req->r_dentry_drop +
1948 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1949 if (req->r_dentry_drop)
1950 len += req->r_dentry->d_name.len;
1951 if (req->r_old_dentry_drop)
1952 len += req->r_old_dentry->d_name.len;
1953
1954 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1955 if (!msg) {
1956 msg = ERR_PTR(-ENOMEM);
1957 goto out_free2;
1958 }
1959
1960 msg->hdr.version = cpu_to_le16(2);
1961 msg->hdr.tid = cpu_to_le64(req->r_tid);
1962
1963 head = msg->front.iov_base;
1964 p = msg->front.iov_base + sizeof(*head);
1965 end = msg->front.iov_base + msg->front.iov_len;
1966
1967 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1968 head->op = cpu_to_le32(req->r_op);
1969 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1970 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1971 head->args = req->r_args;
1972
1973 ceph_encode_filepath(&p, end, ino1, path1);
1974 ceph_encode_filepath(&p, end, ino2, path2);
1975
1976
1977 req->r_request_release_offset = p - msg->front.iov_base;
1978
1979
1980 releases = 0;
1981 if (req->r_inode_drop)
1982 releases += ceph_encode_inode_release(&p,
1983 req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1984 mds, req->r_inode_drop, req->r_inode_unless, 0);
1985 if (req->r_dentry_drop)
1986 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1987 req->r_parent, mds, req->r_dentry_drop,
1988 req->r_dentry_unless);
1989 if (req->r_old_dentry_drop)
1990 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1991 req->r_old_dentry_dir, mds,
1992 req->r_old_dentry_drop,
1993 req->r_old_dentry_unless);
1994 if (req->r_old_inode_drop)
1995 releases += ceph_encode_inode_release(&p,
1996 req->r_old_dentry->d_inode,
1997 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1998
1999 if (drop_cap_releases) {
2000 releases = 0;
2001 p = msg->front.iov_base + req->r_request_release_offset;
2002 }
2003
2004 head->num_releases = cpu_to_le16(releases);
2005
2006
2007 {
2008 struct ceph_timespec ts;
2009 ceph_encode_timespec(&ts, &req->r_stamp);
2010 ceph_encode_copy(&p, &ts, sizeof(ts));
2011 }
2012
2013 BUG_ON(p > end);
2014 msg->front.iov_len = p - msg->front.iov_base;
2015 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2016
2017 if (req->r_pagelist) {
2018 struct ceph_pagelist *pagelist = req->r_pagelist;
2019 atomic_inc(&pagelist->refcnt);
2020 ceph_msg_data_add_pagelist(msg, pagelist);
2021 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2022 } else {
2023 msg->hdr.data_len = 0;
2024 }
2025
2026 msg->hdr.data_off = cpu_to_le16(0);
2027
2028out_free2:
2029 if (freepath2)
2030 kfree((char *)path2);
2031out_free1:
2032 if (freepath1)
2033 kfree((char *)path1);
2034out:
2035 return msg;
2036}
2037
2038
2039
2040
2041
2042static void complete_request(struct ceph_mds_client *mdsc,
2043 struct ceph_mds_request *req)
2044{
2045 if (req->r_callback)
2046 req->r_callback(mdsc, req);
2047 else
2048 complete_all(&req->r_completion);
2049}
2050
2051
2052
2053
2054static int __prepare_send_request(struct ceph_mds_client *mdsc,
2055 struct ceph_mds_request *req,
2056 int mds, bool drop_cap_releases)
2057{
2058 struct ceph_mds_request_head *rhead;
2059 struct ceph_msg *msg;
2060 int flags = 0;
2061
2062 req->r_attempts++;
2063 if (req->r_inode) {
2064 struct ceph_cap *cap =
2065 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2066
2067 if (cap)
2068 req->r_sent_on_mseq = cap->mseq;
2069 else
2070 req->r_sent_on_mseq = -1;
2071 }
2072 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2073 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2074
2075 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2076 void *p;
2077
2078
2079
2080
2081
2082
2083 msg = req->r_request;
2084 rhead = msg->front.iov_base;
2085
2086 flags = le32_to_cpu(rhead->flags);
2087 flags |= CEPH_MDS_FLAG_REPLAY;
2088 rhead->flags = cpu_to_le32(flags);
2089
2090 if (req->r_target_inode)
2091 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2092
2093 rhead->num_retry = req->r_attempts - 1;
2094
2095
2096 rhead->num_releases = 0;
2097
2098
2099 p = msg->front.iov_base + req->r_request_release_offset;
2100 {
2101 struct ceph_timespec ts;
2102 ceph_encode_timespec(&ts, &req->r_stamp);
2103 ceph_encode_copy(&p, &ts, sizeof(ts));
2104 }
2105
2106 msg->front.iov_len = p - msg->front.iov_base;
2107 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2108 return 0;
2109 }
2110
2111 if (req->r_request) {
2112 ceph_msg_put(req->r_request);
2113 req->r_request = NULL;
2114 }
2115 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2116 if (IS_ERR(msg)) {
2117 req->r_err = PTR_ERR(msg);
2118 return PTR_ERR(msg);
2119 }
2120 req->r_request = msg;
2121
2122 rhead = msg->front.iov_base;
2123 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2124 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2125 flags |= CEPH_MDS_FLAG_REPLAY;
2126 if (req->r_parent)
2127 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2128 rhead->flags = cpu_to_le32(flags);
2129 rhead->num_fwd = req->r_num_fwd;
2130 rhead->num_retry = req->r_attempts - 1;
2131 rhead->ino = 0;
2132
2133 dout(" r_parent = %p\n", req->r_parent);
2134 return 0;
2135}
2136
2137
2138
2139
2140static int __do_request(struct ceph_mds_client *mdsc,
2141 struct ceph_mds_request *req)
2142{
2143 struct ceph_mds_session *session = NULL;
2144 int mds = -1;
2145 int err = 0;
2146
2147 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2148 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2149 __unregister_request(mdsc, req);
2150 goto out;
2151 }
2152
2153 if (req->r_timeout &&
2154 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2155 dout("do_request timed out\n");
2156 err = -EIO;
2157 goto finish;
2158 }
2159 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2160 dout("do_request forced umount\n");
2161 err = -EIO;
2162 goto finish;
2163 }
2164 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2165 if (mdsc->mdsmap_err) {
2166 err = mdsc->mdsmap_err;
2167 dout("do_request mdsmap err %d\n", err);
2168 goto finish;
2169 }
2170 if (mdsc->mdsmap->m_epoch == 0) {
2171 dout("do_request no mdsmap, waiting for map\n");
2172 list_add(&req->r_wait, &mdsc->waiting_for_map);
2173 goto finish;
2174 }
2175 if (!(mdsc->fsc->mount_options->flags &
2176 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2177 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2178 err = -ENOENT;
2179 pr_info("probably no mds server is up\n");
2180 goto finish;
2181 }
2182 }
2183
2184 put_request_session(req);
2185
2186 mds = __choose_mds(mdsc, req);
2187 if (mds < 0 ||
2188 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2189 dout("do_request no mds or not active, waiting for map\n");
2190 list_add(&req->r_wait, &mdsc->waiting_for_map);
2191 goto out;
2192 }
2193
2194
2195 session = __ceph_lookup_mds_session(mdsc, mds);
2196 if (!session) {
2197 session = register_session(mdsc, mds);
2198 if (IS_ERR(session)) {
2199 err = PTR_ERR(session);
2200 goto finish;
2201 }
2202 }
2203 req->r_session = get_session(session);
2204
2205 dout("do_request mds%d session %p state %s\n", mds, session,
2206 ceph_session_state_name(session->s_state));
2207 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2208 session->s_state != CEPH_MDS_SESSION_HUNG) {
2209 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2210 err = -EACCES;
2211 goto out_session;
2212 }
2213 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2214 session->s_state == CEPH_MDS_SESSION_CLOSING)
2215 __open_session(mdsc, session);
2216 list_add(&req->r_wait, &session->s_waiting);
2217 goto out_session;
2218 }
2219
2220
2221 req->r_resend_mds = -1;
2222
2223 if (req->r_request_started == 0)
2224 req->r_request_started = jiffies;
2225
2226 err = __prepare_send_request(mdsc, req, mds, false);
2227 if (!err) {
2228 ceph_msg_get(req->r_request);
2229 ceph_con_send(&session->s_con, req->r_request);
2230 }
2231
2232out_session:
2233 ceph_put_mds_session(session);
2234finish:
2235 if (err) {
2236 dout("__do_request early error %d\n", err);
2237 req->r_err = err;
2238 complete_request(mdsc, req);
2239 __unregister_request(mdsc, req);
2240 }
2241out:
2242 return err;
2243}
2244
2245
2246
2247
2248static void __wake_requests(struct ceph_mds_client *mdsc,
2249 struct list_head *head)
2250{
2251 struct ceph_mds_request *req;
2252 LIST_HEAD(tmp_list);
2253
2254 list_splice_init(head, &tmp_list);
2255
2256 while (!list_empty(&tmp_list)) {
2257 req = list_entry(tmp_list.next,
2258 struct ceph_mds_request, r_wait);
2259 list_del_init(&req->r_wait);
2260 dout(" wake request %p tid %llu\n", req, req->r_tid);
2261 __do_request(mdsc, req);
2262 }
2263}
2264
2265
2266
2267
2268
2269static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2270{
2271 struct ceph_mds_request *req;
2272 struct rb_node *p = rb_first(&mdsc->request_tree);
2273
2274 dout("kick_requests mds%d\n", mds);
2275 while (p) {
2276 req = rb_entry(p, struct ceph_mds_request, r_node);
2277 p = rb_next(p);
2278 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2279 continue;
2280 if (req->r_attempts > 0)
2281 continue;
2282 if (req->r_session &&
2283 req->r_session->s_mds == mds) {
2284 dout(" kicking tid %llu\n", req->r_tid);
2285 list_del_init(&req->r_wait);
2286 __do_request(mdsc, req);
2287 }
2288 }
2289}
2290
2291void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2292 struct ceph_mds_request *req)
2293{
2294 dout("submit_request on %p\n", req);
2295 mutex_lock(&mdsc->mutex);
2296 __register_request(mdsc, req, NULL);
2297 __do_request(mdsc, req);
2298 mutex_unlock(&mdsc->mutex);
2299}
2300
2301
2302
2303
2304
2305int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2306 struct inode *dir,
2307 struct ceph_mds_request *req)
2308{
2309 int err;
2310
2311 dout("do_request on %p\n", req);
2312
2313
2314 if (req->r_inode)
2315 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2316 if (req->r_parent)
2317 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2318 if (req->r_old_dentry_dir)
2319 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2320 CEPH_CAP_PIN);
2321
2322
2323 if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
2324 ceph_inode(req->r_inode)->i_pool_ns_len)
2325 return -EIO;
2326 if (test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
2327 ceph_inode(req->r_parent)->i_pool_ns_len)
2328 return -EIO;
2329
2330
2331 mutex_lock(&mdsc->mutex);
2332 __register_request(mdsc, req, dir);
2333 __do_request(mdsc, req);
2334
2335 if (req->r_err) {
2336 err = req->r_err;
2337 goto out;
2338 }
2339
2340
2341 mutex_unlock(&mdsc->mutex);
2342 dout("do_request waiting\n");
2343 if (!req->r_timeout && req->r_wait_for_completion) {
2344 err = req->r_wait_for_completion(mdsc, req);
2345 } else {
2346 long timeleft = wait_for_completion_killable_timeout(
2347 &req->r_completion,
2348 ceph_timeout_jiffies(req->r_timeout));
2349 if (timeleft > 0)
2350 err = 0;
2351 else if (!timeleft)
2352 err = -EIO;
2353 else
2354 err = timeleft;
2355 }
2356 dout("do_request waited, got %d\n", err);
2357 mutex_lock(&mdsc->mutex);
2358
2359
2360 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2361 err = le32_to_cpu(req->r_reply_info.head->result);
2362 } else if (err < 0) {
2363 dout("aborted request %lld with %d\n", req->r_tid, err);
2364
2365
2366
2367
2368
2369
2370 mutex_lock(&req->r_fill_mutex);
2371 req->r_err = err;
2372 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2373 mutex_unlock(&req->r_fill_mutex);
2374
2375 if (req->r_parent &&
2376 (req->r_op & CEPH_MDS_OP_WRITE))
2377 ceph_invalidate_dir_request(req);
2378 } else {
2379 err = req->r_err;
2380 }
2381
2382out:
2383 mutex_unlock(&mdsc->mutex);
2384 dout("do_request %p done, result %d\n", req, err);
2385 return err;
2386}
2387
2388
2389
2390
2391
2392void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2393{
2394 struct inode *inode = req->r_parent;
2395
2396 dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2397
2398 ceph_dir_clear_complete(inode);
2399 if (req->r_dentry)
2400 ceph_invalidate_dentry_lease(req->r_dentry);
2401 if (req->r_old_dentry)
2402 ceph_invalidate_dentry_lease(req->r_old_dentry);
2403}
2404
2405
2406
2407
2408
2409
2410
2411
2412static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2413{
2414 struct ceph_mds_client *mdsc = session->s_mdsc;
2415 struct ceph_mds_request *req;
2416 struct ceph_mds_reply_head *head = msg->front.iov_base;
2417 struct ceph_mds_reply_info_parsed *rinfo;
2418 struct ceph_snap_realm *realm;
2419 u64 tid;
2420 int err, result;
2421 int mds = session->s_mds;
2422
2423 if (msg->front.iov_len < sizeof(*head)) {
2424 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2425 ceph_msg_dump(msg);
2426 return;
2427 }
2428
2429
2430 tid = le64_to_cpu(msg->hdr.tid);
2431 mutex_lock(&mdsc->mutex);
2432 req = lookup_get_request(mdsc, tid);
2433 if (!req) {
2434 dout("handle_reply on unknown tid %llu\n", tid);
2435 mutex_unlock(&mdsc->mutex);
2436 return;
2437 }
2438 dout("handle_reply %p\n", req);
2439
2440
2441 if (req->r_session != session) {
2442 pr_err("mdsc_handle_reply got %llu on session mds%d"
2443 " not mds%d\n", tid, session->s_mds,
2444 req->r_session ? req->r_session->s_mds : -1);
2445 mutex_unlock(&mdsc->mutex);
2446 goto out;
2447 }
2448
2449
2450 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2451 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2452 pr_warn("got a dup %s reply on %llu from mds%d\n",
2453 head->safe ? "safe" : "unsafe", tid, mds);
2454 mutex_unlock(&mdsc->mutex);
2455 goto out;
2456 }
2457 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2458 pr_warn("got unsafe after safe on %llu from mds%d\n",
2459 tid, mds);
2460 mutex_unlock(&mdsc->mutex);
2461 goto out;
2462 }
2463
2464 result = le32_to_cpu(head->result);
2465
2466
2467
2468
2469
2470
2471
2472
2473 if (result == -ESTALE) {
2474 dout("got ESTALE on request %llu", req->r_tid);
2475 req->r_resend_mds = -1;
2476 if (req->r_direct_mode != USE_AUTH_MDS) {
2477 dout("not using auth, setting for that now");
2478 req->r_direct_mode = USE_AUTH_MDS;
2479 __do_request(mdsc, req);
2480 mutex_unlock(&mdsc->mutex);
2481 goto out;
2482 } else {
2483 int mds = __choose_mds(mdsc, req);
2484 if (mds >= 0 && mds != req->r_session->s_mds) {
2485 dout("but auth changed, so resending");
2486 __do_request(mdsc, req);
2487 mutex_unlock(&mdsc->mutex);
2488 goto out;
2489 }
2490 }
2491 dout("have to return ESTALE on request %llu", req->r_tid);
2492 }
2493
2494
2495 if (head->safe) {
2496 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2497 __unregister_request(mdsc, req);
2498
2499 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2500
2501
2502
2503
2504
2505
2506
2507 dout("got safe reply %llu, mds%d\n", tid, mds);
2508
2509
2510 if (mdsc->stopping && !__get_oldest_req(mdsc))
2511 complete_all(&mdsc->safe_umount_waiters);
2512 mutex_unlock(&mdsc->mutex);
2513 goto out;
2514 }
2515 } else {
2516 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2517 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2518 if (req->r_unsafe_dir) {
2519 struct ceph_inode_info *ci =
2520 ceph_inode(req->r_unsafe_dir);
2521 spin_lock(&ci->i_unsafe_lock);
2522 list_add_tail(&req->r_unsafe_dir_item,
2523 &ci->i_unsafe_dirops);
2524 spin_unlock(&ci->i_unsafe_lock);
2525 }
2526 }
2527
2528 dout("handle_reply tid %lld result %d\n", tid, result);
2529 rinfo = &req->r_reply_info;
2530 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2531 mutex_unlock(&mdsc->mutex);
2532
2533 mutex_lock(&session->s_mutex);
2534 if (err < 0) {
2535 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2536 ceph_msg_dump(msg);
2537 goto out_err;
2538 }
2539
2540
2541 realm = NULL;
2542 if (rinfo->snapblob_len) {
2543 down_write(&mdsc->snap_rwsem);
2544 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2545 rinfo->snapblob + rinfo->snapblob_len,
2546 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2547 &realm);
2548 downgrade_write(&mdsc->snap_rwsem);
2549 } else {
2550 down_read(&mdsc->snap_rwsem);
2551 }
2552
2553
2554 mutex_lock(&req->r_fill_mutex);
2555 current->journal_info = req;
2556 err = ceph_fill_trace(mdsc->fsc->sb, req);
2557 if (err == 0) {
2558 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2559 req->r_op == CEPH_MDS_OP_LSSNAP))
2560 ceph_readdir_prepopulate(req, req->r_session);
2561 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2562 }
2563 current->journal_info = NULL;
2564 mutex_unlock(&req->r_fill_mutex);
2565
2566 up_read(&mdsc->snap_rwsem);
2567 if (realm)
2568 ceph_put_snap_realm(mdsc, realm);
2569
2570 if (err == 0 && req->r_target_inode &&
2571 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2572 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2573 spin_lock(&ci->i_unsafe_lock);
2574 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2575 spin_unlock(&ci->i_unsafe_lock);
2576 }
2577out_err:
2578 mutex_lock(&mdsc->mutex);
2579 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2580 if (err) {
2581 req->r_err = err;
2582 } else {
2583 req->r_reply = ceph_msg_get(msg);
2584 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2585 }
2586 } else {
2587 dout("reply arrived after request %lld was aborted\n", tid);
2588 }
2589 mutex_unlock(&mdsc->mutex);
2590
2591 mutex_unlock(&session->s_mutex);
2592
2593
2594 complete_request(mdsc, req);
2595out:
2596 ceph_mdsc_put_request(req);
2597 return;
2598}
2599
2600
2601
2602
2603
2604
2605static void handle_forward(struct ceph_mds_client *mdsc,
2606 struct ceph_mds_session *session,
2607 struct ceph_msg *msg)
2608{
2609 struct ceph_mds_request *req;
2610 u64 tid = le64_to_cpu(msg->hdr.tid);
2611 u32 next_mds;
2612 u32 fwd_seq;
2613 int err = -EINVAL;
2614 void *p = msg->front.iov_base;
2615 void *end = p + msg->front.iov_len;
2616
2617 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2618 next_mds = ceph_decode_32(&p);
2619 fwd_seq = ceph_decode_32(&p);
2620
2621 mutex_lock(&mdsc->mutex);
2622 req = lookup_get_request(mdsc, tid);
2623 if (!req) {
2624 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2625 goto out;
2626 }
2627
2628 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2629 dout("forward tid %llu aborted, unregistering\n", tid);
2630 __unregister_request(mdsc, req);
2631 } else if (fwd_seq <= req->r_num_fwd) {
2632 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2633 tid, next_mds, req->r_num_fwd, fwd_seq);
2634 } else {
2635
2636 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2637 BUG_ON(req->r_err);
2638 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2639 req->r_attempts = 0;
2640 req->r_num_fwd = fwd_seq;
2641 req->r_resend_mds = next_mds;
2642 put_request_session(req);
2643 __do_request(mdsc, req);
2644 }
2645 ceph_mdsc_put_request(req);
2646out:
2647 mutex_unlock(&mdsc->mutex);
2648 return;
2649
2650bad:
2651 pr_err("mdsc_handle_forward decode error err=%d\n", err);
2652}
2653
2654
2655
2656
2657static void handle_session(struct ceph_mds_session *session,
2658 struct ceph_msg *msg)
2659{
2660 struct ceph_mds_client *mdsc = session->s_mdsc;
2661 u32 op;
2662 u64 seq;
2663 int mds = session->s_mds;
2664 struct ceph_mds_session_head *h = msg->front.iov_base;
2665 int wake = 0;
2666
2667
2668 if (msg->front.iov_len != sizeof(*h))
2669 goto bad;
2670 op = le32_to_cpu(h->op);
2671 seq = le64_to_cpu(h->seq);
2672
2673 mutex_lock(&mdsc->mutex);
2674 if (op == CEPH_SESSION_CLOSE) {
2675 get_session(session);
2676 __unregister_session(mdsc, session);
2677 }
2678
2679 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2680 mutex_unlock(&mdsc->mutex);
2681
2682 mutex_lock(&session->s_mutex);
2683
2684 dout("handle_session mds%d %s %p state %s seq %llu\n",
2685 mds, ceph_session_op_name(op), session,
2686 ceph_session_state_name(session->s_state), seq);
2687
2688 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2689 session->s_state = CEPH_MDS_SESSION_OPEN;
2690 pr_info("mds%d came back\n", session->s_mds);
2691 }
2692
2693 switch (op) {
2694 case CEPH_SESSION_OPEN:
2695 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2696 pr_info("mds%d reconnect success\n", session->s_mds);
2697 session->s_state = CEPH_MDS_SESSION_OPEN;
2698 renewed_caps(mdsc, session, 0);
2699 wake = 1;
2700 if (mdsc->stopping)
2701 __close_session(mdsc, session);
2702 break;
2703
2704 case CEPH_SESSION_RENEWCAPS:
2705 if (session->s_renew_seq == seq)
2706 renewed_caps(mdsc, session, 1);
2707 break;
2708
2709 case CEPH_SESSION_CLOSE:
2710 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2711 pr_info("mds%d reconnect denied\n", session->s_mds);
2712 cleanup_session_requests(mdsc, session);
2713 remove_session_caps(session);
2714 wake = 2;
2715 wake_up_all(&mdsc->session_close_wq);
2716 break;
2717
2718 case CEPH_SESSION_STALE:
2719 pr_info("mds%d caps went stale, renewing\n",
2720 session->s_mds);
2721 spin_lock(&session->s_gen_ttl_lock);
2722 session->s_cap_gen++;
2723 session->s_cap_ttl = jiffies - 1;
2724 spin_unlock(&session->s_gen_ttl_lock);
2725 send_renew_caps(mdsc, session);
2726 break;
2727
2728 case CEPH_SESSION_RECALL_STATE:
2729 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2730 break;
2731
2732 case CEPH_SESSION_FLUSHMSG:
2733 send_flushmsg_ack(mdsc, session, seq);
2734 break;
2735
2736 case CEPH_SESSION_FORCE_RO:
2737 dout("force_session_readonly %p\n", session);
2738 spin_lock(&session->s_cap_lock);
2739 session->s_readonly = true;
2740 spin_unlock(&session->s_cap_lock);
2741 wake_up_session_caps(session, 0);
2742 break;
2743
2744 case CEPH_SESSION_REJECT:
2745 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
2746 pr_info("mds%d rejected session\n", session->s_mds);
2747 session->s_state = CEPH_MDS_SESSION_REJECTED;
2748 cleanup_session_requests(mdsc, session);
2749 remove_session_caps(session);
2750 wake = 2;
2751 break;
2752
2753 default:
2754 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2755 WARN_ON(1);
2756 }
2757
2758 mutex_unlock(&session->s_mutex);
2759 if (wake) {
2760 mutex_lock(&mdsc->mutex);
2761 __wake_requests(mdsc, &session->s_waiting);
2762 if (wake == 2)
2763 kick_requests(mdsc, mds);
2764 mutex_unlock(&mdsc->mutex);
2765 }
2766 if (op == CEPH_SESSION_CLOSE)
2767 ceph_put_mds_session(session);
2768 return;
2769
2770bad:
2771 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2772 (int)msg->front.iov_len);
2773 ceph_msg_dump(msg);
2774 return;
2775}
2776
2777
2778
2779
2780
2781static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2782 struct ceph_mds_session *session)
2783{
2784 struct ceph_mds_request *req, *nreq;
2785 struct rb_node *p;
2786 int err;
2787
2788 dout("replay_unsafe_requests mds%d\n", session->s_mds);
2789
2790 mutex_lock(&mdsc->mutex);
2791 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2792 err = __prepare_send_request(mdsc, req, session->s_mds, true);
2793 if (!err) {
2794 ceph_msg_get(req->r_request);
2795 ceph_con_send(&session->s_con, req->r_request);
2796 }
2797 }
2798
2799
2800
2801
2802
2803 p = rb_first(&mdsc->request_tree);
2804 while (p) {
2805 req = rb_entry(p, struct ceph_mds_request, r_node);
2806 p = rb_next(p);
2807 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2808 continue;
2809 if (req->r_attempts == 0)
2810 continue;
2811 if (req->r_session &&
2812 req->r_session->s_mds == session->s_mds) {
2813 err = __prepare_send_request(mdsc, req,
2814 session->s_mds, true);
2815 if (!err) {
2816 ceph_msg_get(req->r_request);
2817 ceph_con_send(&session->s_con, req->r_request);
2818 }
2819 }
2820 }
2821 mutex_unlock(&mdsc->mutex);
2822}
2823
2824
2825
2826
2827static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2828 void *arg)
2829{
2830 union {
2831 struct ceph_mds_cap_reconnect v2;
2832 struct ceph_mds_cap_reconnect_v1 v1;
2833 } rec;
2834 struct ceph_inode_info *ci;
2835 struct ceph_reconnect_state *recon_state = arg;
2836 struct ceph_pagelist *pagelist = recon_state->pagelist;
2837 char *path;
2838 int pathlen, err;
2839 u64 pathbase;
2840 u64 snap_follows;
2841 struct dentry *dentry;
2842
2843 ci = cap->ci;
2844
2845 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2846 inode, ceph_vinop(inode), cap, cap->cap_id,
2847 ceph_cap_string(cap->issued));
2848 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2849 if (err)
2850 return err;
2851
2852 dentry = d_find_alias(inode);
2853 if (dentry) {
2854 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2855 if (IS_ERR(path)) {
2856 err = PTR_ERR(path);
2857 goto out_dput;
2858 }
2859 } else {
2860 path = NULL;
2861 pathlen = 0;
2862 pathbase = 0;
2863 }
2864
2865 spin_lock(&ci->i_ceph_lock);
2866 cap->seq = 0;
2867 cap->issue_seq = 0;
2868 cap->mseq = 0;
2869 cap->cap_gen = cap->session->s_cap_gen;
2870
2871 if (recon_state->msg_version >= 2) {
2872 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2873 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2874 rec.v2.issued = cpu_to_le32(cap->issued);
2875 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2876 rec.v2.pathbase = cpu_to_le64(pathbase);
2877 rec.v2.flock_len = 0;
2878 } else {
2879 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2880 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2881 rec.v1.issued = cpu_to_le32(cap->issued);
2882 rec.v1.size = cpu_to_le64(inode->i_size);
2883 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2884 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2885 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2886 rec.v1.pathbase = cpu_to_le64(pathbase);
2887 }
2888
2889 if (list_empty(&ci->i_cap_snaps)) {
2890 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
2891 } else {
2892 struct ceph_cap_snap *capsnap =
2893 list_first_entry(&ci->i_cap_snaps,
2894 struct ceph_cap_snap, ci_item);
2895 snap_follows = capsnap->follows;
2896 }
2897 spin_unlock(&ci->i_ceph_lock);
2898
2899 if (recon_state->msg_version >= 2) {
2900 int num_fcntl_locks, num_flock_locks;
2901 struct ceph_filelock *flocks;
2902 size_t struct_len, total_len = 0;
2903 u8 struct_v = 0;
2904
2905encode_again:
2906 spin_lock(&inode->i_lock);
2907 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2908 spin_unlock(&inode->i_lock);
2909 flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2910 sizeof(struct ceph_filelock), GFP_NOFS);
2911 if (!flocks) {
2912 err = -ENOMEM;
2913 goto out_free;
2914 }
2915 spin_lock(&inode->i_lock);
2916 err = ceph_encode_locks_to_buffer(inode, flocks,
2917 num_fcntl_locks,
2918 num_flock_locks);
2919 spin_unlock(&inode->i_lock);
2920 if (err) {
2921 kfree(flocks);
2922 if (err == -ENOSPC)
2923 goto encode_again;
2924 goto out_free;
2925 }
2926
2927 if (recon_state->msg_version >= 3) {
2928
2929 total_len = 2 * sizeof(u8) + sizeof(u32);
2930 struct_v = 2;
2931 }
2932
2933
2934
2935 struct_len = 2 * sizeof(u32) +
2936 (num_fcntl_locks + num_flock_locks) *
2937 sizeof(struct ceph_filelock);
2938 rec.v2.flock_len = cpu_to_le32(struct_len);
2939
2940 struct_len += sizeof(rec.v2);
2941 struct_len += sizeof(u32) + pathlen;
2942
2943 if (struct_v >= 2)
2944 struct_len += sizeof(u64);
2945
2946 total_len += struct_len;
2947 err = ceph_pagelist_reserve(pagelist, total_len);
2948
2949 if (!err) {
2950 if (recon_state->msg_version >= 3) {
2951 ceph_pagelist_encode_8(pagelist, struct_v);
2952 ceph_pagelist_encode_8(pagelist, 1);
2953 ceph_pagelist_encode_32(pagelist, struct_len);
2954 }
2955 ceph_pagelist_encode_string(pagelist, path, pathlen);
2956 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
2957 ceph_locks_to_pagelist(flocks, pagelist,
2958 num_fcntl_locks,
2959 num_flock_locks);
2960 if (struct_v >= 2)
2961 ceph_pagelist_encode_64(pagelist, snap_follows);
2962 }
2963 kfree(flocks);
2964 } else {
2965 size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
2966 err = ceph_pagelist_reserve(pagelist, size);
2967 if (!err) {
2968 ceph_pagelist_encode_string(pagelist, path, pathlen);
2969 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
2970 }
2971 }
2972
2973 recon_state->nr_caps++;
2974out_free:
2975 kfree(path);
2976out_dput:
2977 dput(dentry);
2978 return err;
2979}
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2995 struct ceph_mds_session *session)
2996{
2997 struct ceph_msg *reply;
2998 struct rb_node *p;
2999 int mds = session->s_mds;
3000 int err = -ENOMEM;
3001 int s_nr_caps;
3002 struct ceph_pagelist *pagelist;
3003 struct ceph_reconnect_state recon_state;
3004
3005 pr_info("mds%d reconnect start\n", mds);
3006
3007 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
3008 if (!pagelist)
3009 goto fail_nopagelist;
3010 ceph_pagelist_init(pagelist);
3011
3012 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
3013 if (!reply)
3014 goto fail_nomsg;
3015
3016 mutex_lock(&session->s_mutex);
3017 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3018 session->s_seq = 0;
3019
3020 dout("session %p state %s\n", session,
3021 ceph_session_state_name(session->s_state));
3022
3023 spin_lock(&session->s_gen_ttl_lock);
3024 session->s_cap_gen++;
3025 spin_unlock(&session->s_gen_ttl_lock);
3026
3027 spin_lock(&session->s_cap_lock);
3028
3029 session->s_readonly = 0;
3030
3031
3032
3033
3034
3035 session->s_cap_reconnect = 1;
3036
3037 cleanup_cap_releases(mdsc, session);
3038
3039
3040 if (mdsc->fsc->sb->s_root)
3041 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3042
3043 ceph_con_close(&session->s_con);
3044 ceph_con_open(&session->s_con,
3045 CEPH_ENTITY_TYPE_MDS, mds,
3046 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3047
3048
3049 replay_unsafe_requests(mdsc, session);
3050
3051 down_read(&mdsc->snap_rwsem);
3052
3053
3054 s_nr_caps = session->s_nr_caps;
3055 err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
3056 if (err)
3057 goto fail;
3058
3059 recon_state.nr_caps = 0;
3060 recon_state.pagelist = pagelist;
3061 if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
3062 recon_state.msg_version = 3;
3063 else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
3064 recon_state.msg_version = 2;
3065 else
3066 recon_state.msg_version = 1;
3067 err = iterate_session_caps(session, encode_caps_cb, &recon_state);
3068 if (err < 0)
3069 goto fail;
3070
3071 spin_lock(&session->s_cap_lock);
3072 session->s_cap_reconnect = 0;
3073 spin_unlock(&session->s_cap_lock);
3074
3075
3076
3077
3078
3079
3080 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3081 struct ceph_snap_realm *realm =
3082 rb_entry(p, struct ceph_snap_realm, node);
3083 struct ceph_mds_snaprealm_reconnect sr_rec;
3084
3085 dout(" adding snap realm %llx seq %lld parent %llx\n",
3086 realm->ino, realm->seq, realm->parent_ino);
3087 sr_rec.ino = cpu_to_le64(realm->ino);
3088 sr_rec.seq = cpu_to_le64(realm->seq);
3089 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3090 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3091 if (err)
3092 goto fail;
3093 }
3094
3095 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3096
3097
3098 if (s_nr_caps != recon_state.nr_caps) {
3099 struct page *page = list_first_entry(&pagelist->head,
3100 struct page, lru);
3101 __le32 *addr = kmap_atomic(page);
3102 *addr = cpu_to_le32(recon_state.nr_caps);
3103 kunmap_atomic(addr);
3104 }
3105
3106 reply->hdr.data_len = cpu_to_le32(pagelist->length);
3107 ceph_msg_data_add_pagelist(reply, pagelist);
3108
3109 ceph_early_kick_flushing_caps(mdsc, session);
3110
3111 ceph_con_send(&session->s_con, reply);
3112
3113 mutex_unlock(&session->s_mutex);
3114
3115 mutex_lock(&mdsc->mutex);
3116 __wake_requests(mdsc, &session->s_waiting);
3117 mutex_unlock(&mdsc->mutex);
3118
3119 up_read(&mdsc->snap_rwsem);
3120 return;
3121
3122fail:
3123 ceph_msg_put(reply);
3124 up_read(&mdsc->snap_rwsem);
3125 mutex_unlock(&session->s_mutex);
3126fail_nomsg:
3127 ceph_pagelist_release(pagelist);
3128fail_nopagelist:
3129 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3130 return;
3131}
3132
3133
3134
3135
3136
3137
3138
3139
3140static void check_new_map(struct ceph_mds_client *mdsc,
3141 struct ceph_mdsmap *newmap,
3142 struct ceph_mdsmap *oldmap)
3143{
3144 int i;
3145 int oldstate, newstate;
3146 struct ceph_mds_session *s;
3147
3148 dout("check_new_map new %u old %u\n",
3149 newmap->m_epoch, oldmap->m_epoch);
3150
3151 for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3152 if (!mdsc->sessions[i])
3153 continue;
3154 s = mdsc->sessions[i];
3155 oldstate = ceph_mdsmap_get_state(oldmap, i);
3156 newstate = ceph_mdsmap_get_state(newmap, i);
3157
3158 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3159 i, ceph_mds_state_name(oldstate),
3160 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3161 ceph_mds_state_name(newstate),
3162 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3163 ceph_session_state_name(s->s_state));
3164
3165 if (i >= newmap->m_num_mds ||
3166 memcmp(ceph_mdsmap_get_addr(oldmap, i),
3167 ceph_mdsmap_get_addr(newmap, i),
3168 sizeof(struct ceph_entity_addr))) {
3169 if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3170
3171
3172 get_session(s);
3173 __unregister_session(mdsc, s);
3174 __wake_requests(mdsc, &s->s_waiting);
3175 ceph_put_mds_session(s);
3176 } else if (i >= newmap->m_num_mds) {
3177
3178 get_session(s);
3179 __unregister_session(mdsc, s);
3180 __wake_requests(mdsc, &s->s_waiting);
3181 kick_requests(mdsc, i);
3182 mutex_unlock(&mdsc->mutex);
3183
3184 mutex_lock(&s->s_mutex);
3185 cleanup_session_requests(mdsc, s);
3186 remove_session_caps(s);
3187 mutex_unlock(&s->s_mutex);
3188
3189 ceph_put_mds_session(s);
3190
3191 mutex_lock(&mdsc->mutex);
3192 } else {
3193
3194 mutex_unlock(&mdsc->mutex);
3195 mutex_lock(&s->s_mutex);
3196 mutex_lock(&mdsc->mutex);
3197 ceph_con_close(&s->s_con);
3198 mutex_unlock(&s->s_mutex);
3199 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3200 }
3201 } else if (oldstate == newstate) {
3202 continue;
3203 }
3204
3205
3206
3207
3208 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3209 newstate >= CEPH_MDS_STATE_RECONNECT) {
3210 mutex_unlock(&mdsc->mutex);
3211 send_mds_reconnect(mdsc, s);
3212 mutex_lock(&mdsc->mutex);
3213 }
3214
3215
3216
3217
3218 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3219 newstate >= CEPH_MDS_STATE_ACTIVE) {
3220 if (oldstate != CEPH_MDS_STATE_CREATING &&
3221 oldstate != CEPH_MDS_STATE_STARTING)
3222 pr_info("mds%d recovery completed\n", s->s_mds);
3223 kick_requests(mdsc, i);
3224 ceph_kick_flushing_caps(mdsc, s);
3225 wake_up_session_caps(s, 1);
3226 }
3227 }
3228
3229 for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3230 s = mdsc->sessions[i];
3231 if (!s)
3232 continue;
3233 if (!ceph_mdsmap_is_laggy(newmap, i))
3234 continue;
3235 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3236 s->s_state == CEPH_MDS_SESSION_HUNG ||
3237 s->s_state == CEPH_MDS_SESSION_CLOSING) {
3238 dout(" connecting to export targets of laggy mds%d\n",
3239 i);
3240 __open_export_target_sessions(mdsc, s);
3241 }
3242 }
3243}
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3255{
3256 struct ceph_dentry_info *di = ceph_dentry(dentry);
3257
3258 ceph_put_mds_session(di->lease_session);
3259 di->lease_session = NULL;
3260}
3261
3262static void handle_lease(struct ceph_mds_client *mdsc,
3263 struct ceph_mds_session *session,
3264 struct ceph_msg *msg)
3265{
3266 struct super_block *sb = mdsc->fsc->sb;
3267 struct inode *inode;
3268 struct dentry *parent, *dentry;
3269 struct ceph_dentry_info *di;
3270 int mds = session->s_mds;
3271 struct ceph_mds_lease *h = msg->front.iov_base;
3272 u32 seq;
3273 struct ceph_vino vino;
3274 struct qstr dname;
3275 int release = 0;
3276
3277 dout("handle_lease from mds%d\n", mds);
3278
3279
3280 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3281 goto bad;
3282 vino.ino = le64_to_cpu(h->ino);
3283 vino.snap = CEPH_NOSNAP;
3284 seq = le32_to_cpu(h->seq);
3285 dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3286 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3287 if (dname.len != get_unaligned_le32(h+1))
3288 goto bad;
3289
3290
3291 inode = ceph_find_inode(sb, vino);
3292 dout("handle_lease %s, ino %llx %p %.*s\n",
3293 ceph_lease_op_name(h->action), vino.ino, inode,
3294 dname.len, dname.name);
3295
3296 mutex_lock(&session->s_mutex);
3297 session->s_seq++;
3298
3299 if (!inode) {
3300 dout("handle_lease no inode %llx\n", vino.ino);
3301 goto release;
3302 }
3303
3304
3305 parent = d_find_alias(inode);
3306 if (!parent) {
3307 dout("no parent dentry on inode %p\n", inode);
3308 WARN_ON(1);
3309 goto release;
3310 }
3311 dname.hash = full_name_hash(dname.name, dname.len);
3312 dentry = d_lookup(parent, &dname);
3313 dput(parent);
3314 if (!dentry)
3315 goto release;
3316
3317 spin_lock(&dentry->d_lock);
3318 di = ceph_dentry(dentry);
3319 switch (h->action) {
3320 case CEPH_MDS_LEASE_REVOKE:
3321 if (di->lease_session == session) {
3322 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3323 h->seq = cpu_to_le32(di->lease_seq);
3324 __ceph_mdsc_drop_dentry_lease(dentry);
3325 }
3326 release = 1;
3327 break;
3328
3329 case CEPH_MDS_LEASE_RENEW:
3330 if (di->lease_session == session &&
3331 di->lease_gen == session->s_cap_gen &&
3332 di->lease_renew_from &&
3333 di->lease_renew_after == 0) {
3334 unsigned long duration =
3335 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3336
3337 di->lease_seq = seq;
3338 dentry->d_time = di->lease_renew_from + duration;
3339 di->lease_renew_after = di->lease_renew_from +
3340 (duration >> 1);
3341 di->lease_renew_from = 0;
3342 }
3343 break;
3344 }
3345 spin_unlock(&dentry->d_lock);
3346 dput(dentry);
3347
3348 if (!release)
3349 goto out;
3350
3351release:
3352
3353 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3354 ceph_msg_get(msg);
3355 ceph_con_send(&session->s_con, msg);
3356
3357out:
3358 iput(inode);
3359 mutex_unlock(&session->s_mutex);
3360 return;
3361
3362bad:
3363 pr_err("corrupt lease message\n");
3364 ceph_msg_dump(msg);
3365}
3366
3367void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3368 struct inode *inode,
3369 struct dentry *dentry, char action,
3370 u32 seq)
3371{
3372 struct ceph_msg *msg;
3373 struct ceph_mds_lease *lease;
3374 int len = sizeof(*lease) + sizeof(u32);
3375 int dnamelen = 0;
3376
3377 dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3378 inode, dentry, ceph_lease_op_name(action), session->s_mds);
3379 dnamelen = dentry->d_name.len;
3380 len += dnamelen;
3381
3382 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3383 if (!msg)
3384 return;
3385 lease = msg->front.iov_base;
3386 lease->action = action;
3387 lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3388 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3389 lease->seq = cpu_to_le32(seq);
3390 put_unaligned_le32(dnamelen, lease + 1);
3391 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3392
3393
3394
3395
3396
3397
3398 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3399
3400 ceph_con_send(&session->s_con, msg);
3401}
3402
3403
3404
3405
3406static void drop_leases(struct ceph_mds_client *mdsc)
3407{
3408 int i;
3409
3410 dout("drop_leases\n");
3411 mutex_lock(&mdsc->mutex);
3412 for (i = 0; i < mdsc->max_sessions; i++) {
3413 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3414 if (!s)
3415 continue;
3416 mutex_unlock(&mdsc->mutex);
3417 mutex_lock(&s->s_mutex);
3418 mutex_unlock(&s->s_mutex);
3419 ceph_put_mds_session(s);
3420 mutex_lock(&mdsc->mutex);
3421 }
3422 mutex_unlock(&mdsc->mutex);
3423}
3424
3425
3426
3427
3428
3429
3430static void schedule_delayed(struct ceph_mds_client *mdsc)
3431{
3432 int delay = 5;
3433 unsigned hz = round_jiffies_relative(HZ * delay);
3434 schedule_delayed_work(&mdsc->delayed_work, hz);
3435}
3436
3437static void delayed_work(struct work_struct *work)
3438{
3439 int i;
3440 struct ceph_mds_client *mdsc =
3441 container_of(work, struct ceph_mds_client, delayed_work.work);
3442 int renew_interval;
3443 int renew_caps;
3444
3445 dout("mdsc delayed_work\n");
3446 ceph_check_delayed_caps(mdsc);
3447
3448 mutex_lock(&mdsc->mutex);
3449 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3450 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3451 mdsc->last_renew_caps);
3452 if (renew_caps)
3453 mdsc->last_renew_caps = jiffies;
3454
3455 for (i = 0; i < mdsc->max_sessions; i++) {
3456 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3457 if (!s)
3458 continue;
3459 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3460 dout("resending session close request for mds%d\n",
3461 s->s_mds);
3462 request_close_session(mdsc, s);
3463 ceph_put_mds_session(s);
3464 continue;
3465 }
3466 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3467 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3468 s->s_state = CEPH_MDS_SESSION_HUNG;
3469 pr_info("mds%d hung\n", s->s_mds);
3470 }
3471 }
3472 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3473
3474 ceph_put_mds_session(s);
3475 continue;
3476 }
3477 mutex_unlock(&mdsc->mutex);
3478
3479 mutex_lock(&s->s_mutex);
3480 if (renew_caps)
3481 send_renew_caps(mdsc, s);
3482 else
3483 ceph_con_keepalive(&s->s_con);
3484 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3485 s->s_state == CEPH_MDS_SESSION_HUNG)
3486 ceph_send_cap_releases(mdsc, s);
3487 mutex_unlock(&s->s_mutex);
3488 ceph_put_mds_session(s);
3489
3490 mutex_lock(&mdsc->mutex);
3491 }
3492 mutex_unlock(&mdsc->mutex);
3493
3494 schedule_delayed(mdsc);
3495}
3496
3497int ceph_mdsc_init(struct ceph_fs_client *fsc)
3498
3499{
3500 struct ceph_mds_client *mdsc;
3501
3502 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3503 if (!mdsc)
3504 return -ENOMEM;
3505 mdsc->fsc = fsc;
3506 fsc->mdsc = mdsc;
3507 mutex_init(&mdsc->mutex);
3508 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3509 if (!mdsc->mdsmap) {
3510 kfree(mdsc);
3511 return -ENOMEM;
3512 }
3513
3514 init_completion(&mdsc->safe_umount_waiters);
3515 init_waitqueue_head(&mdsc->session_close_wq);
3516 INIT_LIST_HEAD(&mdsc->waiting_for_map);
3517 mdsc->sessions = NULL;
3518 atomic_set(&mdsc->num_sessions, 0);
3519 mdsc->max_sessions = 0;
3520 mdsc->stopping = 0;
3521 mdsc->last_snap_seq = 0;
3522 init_rwsem(&mdsc->snap_rwsem);
3523 mdsc->snap_realms = RB_ROOT;
3524 INIT_LIST_HEAD(&mdsc->snap_empty);
3525 spin_lock_init(&mdsc->snap_empty_lock);
3526 mdsc->last_tid = 0;
3527 mdsc->oldest_tid = 0;
3528 mdsc->request_tree = RB_ROOT;
3529 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3530 mdsc->last_renew_caps = jiffies;
3531 INIT_LIST_HEAD(&mdsc->cap_delay_list);
3532 spin_lock_init(&mdsc->cap_delay_lock);
3533 INIT_LIST_HEAD(&mdsc->snap_flush_list);
3534 spin_lock_init(&mdsc->snap_flush_lock);
3535 mdsc->last_cap_flush_tid = 1;
3536 INIT_LIST_HEAD(&mdsc->cap_flush_list);
3537 INIT_LIST_HEAD(&mdsc->cap_dirty);
3538 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3539 mdsc->num_cap_flushing = 0;
3540 spin_lock_init(&mdsc->cap_dirty_lock);
3541 init_waitqueue_head(&mdsc->cap_flushing_wq);
3542 spin_lock_init(&mdsc->dentry_lru_lock);
3543 INIT_LIST_HEAD(&mdsc->dentry_lru);
3544
3545 ceph_caps_init(mdsc);
3546 ceph_adjust_min_caps(mdsc, fsc->min_caps);
3547
3548 init_rwsem(&mdsc->pool_perm_rwsem);
3549 mdsc->pool_perm_tree = RB_ROOT;
3550
3551 strncpy(mdsc->nodename, utsname()->nodename,
3552 sizeof(mdsc->nodename) - 1);
3553 return 0;
3554}
3555
3556
3557
3558
3559
3560static void wait_requests(struct ceph_mds_client *mdsc)
3561{
3562 struct ceph_options *opts = mdsc->fsc->client->options;
3563 struct ceph_mds_request *req;
3564
3565 mutex_lock(&mdsc->mutex);
3566 if (__get_oldest_req(mdsc)) {
3567 mutex_unlock(&mdsc->mutex);
3568
3569 dout("wait_requests waiting for requests\n");
3570 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3571 ceph_timeout_jiffies(opts->mount_timeout));
3572
3573
3574 mutex_lock(&mdsc->mutex);
3575 while ((req = __get_oldest_req(mdsc))) {
3576 dout("wait_requests timed out on tid %llu\n",
3577 req->r_tid);
3578 __unregister_request(mdsc, req);
3579 }
3580 }
3581 mutex_unlock(&mdsc->mutex);
3582 dout("wait_requests done\n");
3583}
3584
3585
3586
3587
3588
3589void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3590{
3591 dout("pre_umount\n");
3592 mdsc->stopping = 1;
3593
3594 drop_leases(mdsc);
3595 ceph_flush_dirty_caps(mdsc);
3596 wait_requests(mdsc);
3597
3598
3599
3600
3601
3602 ceph_msgr_flush();
3603}
3604
3605
3606
3607
3608static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3609{
3610 struct ceph_mds_request *req = NULL, *nextreq;
3611 struct rb_node *n;
3612
3613 mutex_lock(&mdsc->mutex);
3614 dout("wait_unsafe_requests want %lld\n", want_tid);
3615restart:
3616 req = __get_oldest_req(mdsc);
3617 while (req && req->r_tid <= want_tid) {
3618
3619 n = rb_next(&req->r_node);
3620 if (n)
3621 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3622 else
3623 nextreq = NULL;
3624 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3625 (req->r_op & CEPH_MDS_OP_WRITE)) {
3626
3627 ceph_mdsc_get_request(req);
3628 if (nextreq)
3629 ceph_mdsc_get_request(nextreq);
3630 mutex_unlock(&mdsc->mutex);
3631 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
3632 req->r_tid, want_tid);
3633 wait_for_completion(&req->r_safe_completion);
3634 mutex_lock(&mdsc->mutex);
3635 ceph_mdsc_put_request(req);
3636 if (!nextreq)
3637 break;
3638 if (RB_EMPTY_NODE(&nextreq->r_node)) {
3639
3640 ceph_mdsc_put_request(nextreq);
3641 goto restart;
3642 }
3643 ceph_mdsc_put_request(nextreq);
3644 }
3645 req = nextreq;
3646 }
3647 mutex_unlock(&mdsc->mutex);
3648 dout("wait_unsafe_requests done\n");
3649}
3650
3651void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3652{
3653 u64 want_tid, want_flush;
3654
3655 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3656 return;
3657
3658 dout("sync\n");
3659 mutex_lock(&mdsc->mutex);
3660 want_tid = mdsc->last_tid;
3661 mutex_unlock(&mdsc->mutex);
3662
3663 ceph_flush_dirty_caps(mdsc);
3664 spin_lock(&mdsc->cap_dirty_lock);
3665 want_flush = mdsc->last_cap_flush_tid;
3666 if (!list_empty(&mdsc->cap_flush_list)) {
3667 struct ceph_cap_flush *cf =
3668 list_last_entry(&mdsc->cap_flush_list,
3669 struct ceph_cap_flush, g_list);
3670 cf->wake = true;
3671 }
3672 spin_unlock(&mdsc->cap_dirty_lock);
3673
3674 dout("sync want tid %lld flush_seq %lld\n",
3675 want_tid, want_flush);
3676
3677 wait_unsafe_requests(mdsc, want_tid);
3678 wait_caps_flush(mdsc, want_flush);
3679}
3680
3681
3682
3683
3684static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3685{
3686 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3687 return true;
3688 return atomic_read(&mdsc->num_sessions) <= skipped;
3689}
3690
3691
3692
3693
3694void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3695{
3696 struct ceph_options *opts = mdsc->fsc->client->options;
3697 struct ceph_mds_session *session;
3698 int i;
3699 int skipped = 0;
3700
3701 dout("close_sessions\n");
3702
3703
3704 mutex_lock(&mdsc->mutex);
3705 for (i = 0; i < mdsc->max_sessions; i++) {
3706 session = __ceph_lookup_mds_session(mdsc, i);
3707 if (!session)
3708 continue;
3709 mutex_unlock(&mdsc->mutex);
3710 mutex_lock(&session->s_mutex);
3711 if (__close_session(mdsc, session) <= 0)
3712 skipped++;
3713 mutex_unlock(&session->s_mutex);
3714 ceph_put_mds_session(session);
3715 mutex_lock(&mdsc->mutex);
3716 }
3717 mutex_unlock(&mdsc->mutex);
3718
3719 dout("waiting for sessions to close\n");
3720 wait_event_timeout(mdsc->session_close_wq,
3721 done_closing_sessions(mdsc, skipped),
3722 ceph_timeout_jiffies(opts->mount_timeout));
3723
3724
3725 mutex_lock(&mdsc->mutex);
3726 for (i = 0; i < mdsc->max_sessions; i++) {
3727 if (mdsc->sessions[i]) {
3728 session = get_session(mdsc->sessions[i]);
3729 __unregister_session(mdsc, session);
3730 mutex_unlock(&mdsc->mutex);
3731 mutex_lock(&session->s_mutex);
3732 remove_session_caps(session);
3733 mutex_unlock(&session->s_mutex);
3734 ceph_put_mds_session(session);
3735 mutex_lock(&mdsc->mutex);
3736 }
3737 }
3738 WARN_ON(!list_empty(&mdsc->cap_delay_list));
3739 mutex_unlock(&mdsc->mutex);
3740
3741 ceph_cleanup_empty_realms(mdsc);
3742
3743 cancel_delayed_work_sync(&mdsc->delayed_work);
3744
3745 dout("stopped\n");
3746}
3747
3748void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3749{
3750 struct ceph_mds_session *session;
3751 int mds;
3752
3753 dout("force umount\n");
3754
3755 mutex_lock(&mdsc->mutex);
3756 for (mds = 0; mds < mdsc->max_sessions; mds++) {
3757 session = __ceph_lookup_mds_session(mdsc, mds);
3758 if (!session)
3759 continue;
3760 mutex_unlock(&mdsc->mutex);
3761 mutex_lock(&session->s_mutex);
3762 __close_session(mdsc, session);
3763 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3764 cleanup_session_requests(mdsc, session);
3765 remove_session_caps(session);
3766 }
3767 mutex_unlock(&session->s_mutex);
3768 ceph_put_mds_session(session);
3769 mutex_lock(&mdsc->mutex);
3770 kick_requests(mdsc, mds);
3771 }
3772 __wake_requests(mdsc, &mdsc->waiting_for_map);
3773 mutex_unlock(&mdsc->mutex);
3774}
3775
3776static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3777{
3778 dout("stop\n");
3779 cancel_delayed_work_sync(&mdsc->delayed_work);
3780 if (mdsc->mdsmap)
3781 ceph_mdsmap_destroy(mdsc->mdsmap);
3782 kfree(mdsc->sessions);
3783 ceph_caps_finalize(mdsc);
3784 ceph_pool_perm_destroy(mdsc);
3785}
3786
3787void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3788{
3789 struct ceph_mds_client *mdsc = fsc->mdsc;
3790 dout("mdsc_destroy %p\n", mdsc);
3791
3792
3793 ceph_msgr_flush();
3794
3795 ceph_mdsc_stop(mdsc);
3796
3797 fsc->mdsc = NULL;
3798 kfree(mdsc);
3799 dout("mdsc_destroy %p done\n", mdsc);
3800}
3801
3802void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3803{
3804 struct ceph_fs_client *fsc = mdsc->fsc;
3805 const char *mds_namespace = fsc->mount_options->mds_namespace;
3806 void *p = msg->front.iov_base;
3807 void *end = p + msg->front.iov_len;
3808 u32 epoch;
3809 u32 map_len;
3810 u32 num_fs;
3811 u32 mount_fscid = (u32)-1;
3812 u8 struct_v, struct_cv;
3813 int err = -EINVAL;
3814
3815 ceph_decode_need(&p, end, sizeof(u32), bad);
3816 epoch = ceph_decode_32(&p);
3817
3818 dout("handle_fsmap epoch %u\n", epoch);
3819
3820 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3821 struct_v = ceph_decode_8(&p);
3822 struct_cv = ceph_decode_8(&p);
3823 map_len = ceph_decode_32(&p);
3824
3825 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
3826 p += sizeof(u32) * 2;
3827
3828 num_fs = ceph_decode_32(&p);
3829 while (num_fs-- > 0) {
3830 void *info_p, *info_end;
3831 u32 info_len;
3832 u8 info_v, info_cv;
3833 u32 fscid, namelen;
3834
3835 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3836 info_v = ceph_decode_8(&p);
3837 info_cv = ceph_decode_8(&p);
3838 info_len = ceph_decode_32(&p);
3839 ceph_decode_need(&p, end, info_len, bad);
3840 info_p = p;
3841 info_end = p + info_len;
3842 p = info_end;
3843
3844 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
3845 fscid = ceph_decode_32(&info_p);
3846 namelen = ceph_decode_32(&info_p);
3847 ceph_decode_need(&info_p, info_end, namelen, bad);
3848
3849 if (mds_namespace &&
3850 strlen(mds_namespace) == namelen &&
3851 !strncmp(mds_namespace, (char *)info_p, namelen)) {
3852 mount_fscid = fscid;
3853 break;
3854 }
3855 }
3856
3857 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
3858 if (mount_fscid != (u32)-1) {
3859 fsc->client->monc.fs_cluster_id = mount_fscid;
3860 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
3861 0, true);
3862 ceph_monc_renew_subs(&fsc->client->monc);
3863 } else {
3864 err = -ENOENT;
3865 goto err_out;
3866 }
3867 return;
3868bad:
3869 pr_err("error decoding fsmap\n");
3870err_out:
3871 mutex_lock(&mdsc->mutex);
3872 mdsc->mdsmap_err = -ENOENT;
3873 __wake_requests(mdsc, &mdsc->waiting_for_map);
3874 mutex_unlock(&mdsc->mutex);
3875 return;
3876}
3877
3878
3879
3880
3881void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3882{
3883 u32 epoch;
3884 u32 maplen;
3885 void *p = msg->front.iov_base;
3886 void *end = p + msg->front.iov_len;
3887 struct ceph_mdsmap *newmap, *oldmap;
3888 struct ceph_fsid fsid;
3889 int err = -EINVAL;
3890
3891 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3892 ceph_decode_copy(&p, &fsid, sizeof(fsid));
3893 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3894 return;
3895 epoch = ceph_decode_32(&p);
3896 maplen = ceph_decode_32(&p);
3897 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3898
3899
3900 mutex_lock(&mdsc->mutex);
3901 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3902 dout("handle_map epoch %u <= our %u\n",
3903 epoch, mdsc->mdsmap->m_epoch);
3904 mutex_unlock(&mdsc->mutex);
3905 return;
3906 }
3907
3908 newmap = ceph_mdsmap_decode(&p, end);
3909 if (IS_ERR(newmap)) {
3910 err = PTR_ERR(newmap);
3911 goto bad_unlock;
3912 }
3913
3914
3915 if (mdsc->mdsmap) {
3916 oldmap = mdsc->mdsmap;
3917 mdsc->mdsmap = newmap;
3918 check_new_map(mdsc, newmap, oldmap);
3919 ceph_mdsmap_destroy(oldmap);
3920 } else {
3921 mdsc->mdsmap = newmap;
3922 }
3923 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3924
3925 __wake_requests(mdsc, &mdsc->waiting_for_map);
3926 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3927 mdsc->mdsmap->m_epoch);
3928
3929 mutex_unlock(&mdsc->mutex);
3930 schedule_delayed(mdsc);
3931 return;
3932
3933bad_unlock:
3934 mutex_unlock(&mdsc->mutex);
3935bad:
3936 pr_err("error decoding mdsmap %d\n", err);
3937 return;
3938}
3939
3940static struct ceph_connection *con_get(struct ceph_connection *con)
3941{
3942 struct ceph_mds_session *s = con->private;
3943
3944 if (get_session(s)) {
3945 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3946 return con;
3947 }
3948 dout("mdsc con_get %p FAIL\n", s);
3949 return NULL;
3950}
3951
3952static void con_put(struct ceph_connection *con)
3953{
3954 struct ceph_mds_session *s = con->private;
3955
3956 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3957 ceph_put_mds_session(s);
3958}
3959
3960
3961
3962
3963
3964static void peer_reset(struct ceph_connection *con)
3965{
3966 struct ceph_mds_session *s = con->private;
3967 struct ceph_mds_client *mdsc = s->s_mdsc;
3968
3969 pr_warn("mds%d closed our session\n", s->s_mds);
3970 send_mds_reconnect(mdsc, s);
3971}
3972
3973static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3974{
3975 struct ceph_mds_session *s = con->private;
3976 struct ceph_mds_client *mdsc = s->s_mdsc;
3977 int type = le16_to_cpu(msg->hdr.type);
3978
3979 mutex_lock(&mdsc->mutex);
3980 if (__verify_registered_session(mdsc, s) < 0) {
3981 mutex_unlock(&mdsc->mutex);
3982 goto out;
3983 }
3984 mutex_unlock(&mdsc->mutex);
3985
3986 switch (type) {
3987 case CEPH_MSG_MDS_MAP:
3988 ceph_mdsc_handle_mdsmap(mdsc, msg);
3989 break;
3990 case CEPH_MSG_FS_MAP_USER:
3991 ceph_mdsc_handle_fsmap(mdsc, msg);
3992 break;
3993 case CEPH_MSG_CLIENT_SESSION:
3994 handle_session(s, msg);
3995 break;
3996 case CEPH_MSG_CLIENT_REPLY:
3997 handle_reply(s, msg);
3998 break;
3999 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4000 handle_forward(mdsc, s, msg);
4001 break;
4002 case CEPH_MSG_CLIENT_CAPS:
4003 ceph_handle_caps(s, msg);
4004 break;
4005 case CEPH_MSG_CLIENT_SNAP:
4006 ceph_handle_snap(mdsc, s, msg);
4007 break;
4008 case CEPH_MSG_CLIENT_LEASE:
4009 handle_lease(mdsc, s, msg);
4010 break;
4011
4012 default:
4013 pr_err("received unknown message type %d %s\n", type,
4014 ceph_msg_type_name(type));
4015 }
4016out:
4017 ceph_msg_put(msg);
4018}
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4029 int *proto, int force_new)
4030{
4031 struct ceph_mds_session *s = con->private;
4032 struct ceph_mds_client *mdsc = s->s_mdsc;
4033 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4034 struct ceph_auth_handshake *auth = &s->s_auth;
4035
4036 if (force_new && auth->authorizer) {
4037 ceph_auth_destroy_authorizer(auth->authorizer);
4038 auth->authorizer = NULL;
4039 }
4040 if (!auth->authorizer) {
4041 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4042 auth);
4043 if (ret)
4044 return ERR_PTR(ret);
4045 } else {
4046 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4047 auth);
4048 if (ret)
4049 return ERR_PTR(ret);
4050 }
4051 *proto = ac->protocol;
4052
4053 return auth;
4054}
4055
4056
4057static int verify_authorizer_reply(struct ceph_connection *con)
4058{
4059 struct ceph_mds_session *s = con->private;
4060 struct ceph_mds_client *mdsc = s->s_mdsc;
4061 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4062
4063 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4064}
4065
4066static int invalidate_authorizer(struct ceph_connection *con)
4067{
4068 struct ceph_mds_session *s = con->private;
4069 struct ceph_mds_client *mdsc = s->s_mdsc;
4070 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4071
4072 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4073
4074 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4075}
4076
4077static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4078 struct ceph_msg_header *hdr, int *skip)
4079{
4080 struct ceph_msg *msg;
4081 int type = (int) le16_to_cpu(hdr->type);
4082 int front_len = (int) le32_to_cpu(hdr->front_len);
4083
4084 if (con->in_msg)
4085 return con->in_msg;
4086
4087 *skip = 0;
4088 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4089 if (!msg) {
4090 pr_err("unable to allocate msg type %d len %d\n",
4091 type, front_len);
4092 return NULL;
4093 }
4094
4095 return msg;
4096}
4097
4098static int mds_sign_message(struct ceph_msg *msg)
4099{
4100 struct ceph_mds_session *s = msg->con->private;
4101 struct ceph_auth_handshake *auth = &s->s_auth;
4102
4103 return ceph_auth_sign_message(auth, msg);
4104}
4105
4106static int mds_check_message_signature(struct ceph_msg *msg)
4107{
4108 struct ceph_mds_session *s = msg->con->private;
4109 struct ceph_auth_handshake *auth = &s->s_auth;
4110
4111 return ceph_auth_check_message_signature(auth, msg);
4112}
4113
4114static const struct ceph_connection_operations mds_con_ops = {
4115 .get = con_get,
4116 .put = con_put,
4117 .dispatch = dispatch,
4118 .get_authorizer = get_authorizer,
4119 .verify_authorizer_reply = verify_authorizer_reply,
4120 .invalidate_authorizer = invalidate_authorizer,
4121 .peer_reset = peer_reset,
4122 .alloc_msg = mds_alloc_msg,
4123 .sign_message = mds_sign_message,
4124 .check_message_signature = mds_check_message_signature,
4125};
4126
4127
4128