linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/fs.h>
   4#include <linux/wait.h>
   5#include <linux/slab.h>
   6#include <linux/gfp.h>
   7#include <linux/sched.h>
   8#include <linux/debugfs.h>
   9#include <linux/seq_file.h>
  10#include <linux/ratelimit.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14
  15#include <linux/ceph/ceph_features.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/pagelist.h>
  19#include <linux/ceph/auth.h>
  20#include <linux/ceph/debugfs.h>
  21
  22/*
  23 * A cluster of MDS (metadata server) daemons is responsible for
  24 * managing the file system namespace (the directory hierarchy and
  25 * inodes) and for coordinating shared access to storage.  Metadata is
  26 * partitioning hierarchically across a number of servers, and that
  27 * partition varies over time as the cluster adjusts the distribution
  28 * in order to balance load.
  29 *
  30 * The MDS client is primarily responsible to managing synchronous
  31 * metadata requests for operations like open, unlink, and so forth.
  32 * If there is a MDS failure, we find out about it when we (possibly
  33 * request and) receive a new MDS map, and can resubmit affected
  34 * requests.
  35 *
  36 * For the most part, though, we take advantage of a lossless
  37 * communications channel to the MDS, and do not need to worry about
  38 * timing out or resubmitting requests.
  39 *
  40 * We maintain a stateful "session" with each MDS we interact with.
  41 * Within each session, we sent periodic heartbeat messages to ensure
  42 * any capabilities or leases we have been issues remain valid.  If
  43 * the session times out and goes stale, our leases and capabilities
  44 * are no longer valid.
  45 */
  46
  47struct ceph_reconnect_state {
  48        int nr_caps;
  49        struct ceph_pagelist *pagelist;
  50        unsigned msg_version;
  51};
  52
  53static void __wake_requests(struct ceph_mds_client *mdsc,
  54                            struct list_head *head);
  55
  56static const struct ceph_connection_operations mds_con_ops;
  57
  58
  59/*
  60 * mds reply parsing
  61 */
  62
  63/*
  64 * parse individual inode info
  65 */
  66static int parse_reply_info_in(void **p, void *end,
  67                               struct ceph_mds_reply_info_in *info,
  68                               u64 features)
  69{
  70        int err = -EIO;
  71
  72        info->in = *p;
  73        *p += sizeof(struct ceph_mds_reply_inode) +
  74                sizeof(*info->in->fragtree.splits) *
  75                le32_to_cpu(info->in->fragtree.nsplits);
  76
  77        ceph_decode_32_safe(p, end, info->symlink_len, bad);
  78        ceph_decode_need(p, end, info->symlink_len, bad);
  79        info->symlink = *p;
  80        *p += info->symlink_len;
  81
  82        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  83                ceph_decode_copy_safe(p, end, &info->dir_layout,
  84                                      sizeof(info->dir_layout), bad);
  85        else
  86                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  87
  88        ceph_decode_32_safe(p, end, info->xattr_len, bad);
  89        ceph_decode_need(p, end, info->xattr_len, bad);
  90        info->xattr_data = *p;
  91        *p += info->xattr_len;
  92
  93        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
  94                ceph_decode_64_safe(p, end, info->inline_version, bad);
  95                ceph_decode_32_safe(p, end, info->inline_len, bad);
  96                ceph_decode_need(p, end, info->inline_len, bad);
  97                info->inline_data = *p;
  98                *p += info->inline_len;
  99        } else
 100                info->inline_version = CEPH_INLINE_NONE;
 101
 102        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 103                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 104                ceph_decode_need(p, end, info->pool_ns_len, bad);
 105                *p += info->pool_ns_len;
 106        } else {
 107                info->pool_ns_len = 0;
 108        }
 109
 110        return 0;
 111bad:
 112        return err;
 113}
 114
 115/*
 116 * parse a normal reply, which may contain a (dir+)dentry and/or a
 117 * target inode.
 118 */
 119static int parse_reply_info_trace(void **p, void *end,
 120                                  struct ceph_mds_reply_info_parsed *info,
 121                                  u64 features)
 122{
 123        int err;
 124
 125        if (info->head->is_dentry) {
 126                err = parse_reply_info_in(p, end, &info->diri, features);
 127                if (err < 0)
 128                        goto out_bad;
 129
 130                if (unlikely(*p + sizeof(*info->dirfrag) > end))
 131                        goto bad;
 132                info->dirfrag = *p;
 133                *p += sizeof(*info->dirfrag) +
 134                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 135                if (unlikely(*p > end))
 136                        goto bad;
 137
 138                ceph_decode_32_safe(p, end, info->dname_len, bad);
 139                ceph_decode_need(p, end, info->dname_len, bad);
 140                info->dname = *p;
 141                *p += info->dname_len;
 142                info->dlease = *p;
 143                *p += sizeof(*info->dlease);
 144        }
 145
 146        if (info->head->is_target) {
 147                err = parse_reply_info_in(p, end, &info->targeti, features);
 148                if (err < 0)
 149                        goto out_bad;
 150        }
 151
 152        if (unlikely(*p != end))
 153                goto bad;
 154        return 0;
 155
 156bad:
 157        err = -EIO;
 158out_bad:
 159        pr_err("problem parsing mds trace %d\n", err);
 160        return err;
 161}
 162
 163/*
 164 * parse readdir results
 165 */
 166static int parse_reply_info_dir(void **p, void *end,
 167                                struct ceph_mds_reply_info_parsed *info,
 168                                u64 features)
 169{
 170        u32 num, i = 0;
 171        int err;
 172
 173        info->dir_dir = *p;
 174        if (*p + sizeof(*info->dir_dir) > end)
 175                goto bad;
 176        *p += sizeof(*info->dir_dir) +
 177                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 178        if (*p > end)
 179                goto bad;
 180
 181        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 182        num = ceph_decode_32(p);
 183        {
 184                u16 flags = ceph_decode_16(p);
 185                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 186                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 187                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
 188                info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 189        }
 190        if (num == 0)
 191                goto done;
 192
 193        BUG_ON(!info->dir_entries);
 194        if ((unsigned long)(info->dir_entries + num) >
 195            (unsigned long)info->dir_entries + info->dir_buf_size) {
 196                pr_err("dir contents are larger than expected\n");
 197                WARN_ON(1);
 198                goto bad;
 199        }
 200
 201        info->dir_nr = num;
 202        while (num) {
 203                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 204                /* dentry */
 205                ceph_decode_need(p, end, sizeof(u32)*2, bad);
 206                rde->name_len = ceph_decode_32(p);
 207                ceph_decode_need(p, end, rde->name_len, bad);
 208                rde->name = *p;
 209                *p += rde->name_len;
 210                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
 211                rde->lease = *p;
 212                *p += sizeof(struct ceph_mds_reply_lease);
 213
 214                /* inode */
 215                err = parse_reply_info_in(p, end, &rde->inode, features);
 216                if (err < 0)
 217                        goto out_bad;
 218                /* ceph_readdir_prepopulate() will update it */
 219                rde->offset = 0;
 220                i++;
 221                num--;
 222        }
 223
 224done:
 225        if (*p != end)
 226                goto bad;
 227        return 0;
 228
 229bad:
 230        err = -EIO;
 231out_bad:
 232        pr_err("problem parsing dir contents %d\n", err);
 233        return err;
 234}
 235
 236/*
 237 * parse fcntl F_GETLK results
 238 */
 239static int parse_reply_info_filelock(void **p, void *end,
 240                                     struct ceph_mds_reply_info_parsed *info,
 241                                     u64 features)
 242{
 243        if (*p + sizeof(*info->filelock_reply) > end)
 244                goto bad;
 245
 246        info->filelock_reply = *p;
 247        *p += sizeof(*info->filelock_reply);
 248
 249        if (unlikely(*p != end))
 250                goto bad;
 251        return 0;
 252
 253bad:
 254        return -EIO;
 255}
 256
 257/*
 258 * parse create results
 259 */
 260static int parse_reply_info_create(void **p, void *end,
 261                                  struct ceph_mds_reply_info_parsed *info,
 262                                  u64 features)
 263{
 264        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
 265                if (*p == end) {
 266                        info->has_create_ino = false;
 267                } else {
 268                        info->has_create_ino = true;
 269                        info->ino = ceph_decode_64(p);
 270                }
 271        }
 272
 273        if (unlikely(*p != end))
 274                goto bad;
 275        return 0;
 276
 277bad:
 278        return -EIO;
 279}
 280
 281/*
 282 * parse extra results
 283 */
 284static int parse_reply_info_extra(void **p, void *end,
 285                                  struct ceph_mds_reply_info_parsed *info,
 286                                  u64 features)
 287{
 288        u32 op = le32_to_cpu(info->head->op);
 289
 290        if (op == CEPH_MDS_OP_GETFILELOCK)
 291                return parse_reply_info_filelock(p, end, info, features);
 292        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
 293                return parse_reply_info_dir(p, end, info, features);
 294        else if (op == CEPH_MDS_OP_CREATE)
 295                return parse_reply_info_create(p, end, info, features);
 296        else
 297                return -EIO;
 298}
 299
 300/*
 301 * parse entire mds reply
 302 */
 303static int parse_reply_info(struct ceph_msg *msg,
 304                            struct ceph_mds_reply_info_parsed *info,
 305                            u64 features)
 306{
 307        void *p, *end;
 308        u32 len;
 309        int err;
 310
 311        info->head = msg->front.iov_base;
 312        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 313        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 314
 315        /* trace */
 316        ceph_decode_32_safe(&p, end, len, bad);
 317        if (len > 0) {
 318                ceph_decode_need(&p, end, len, bad);
 319                err = parse_reply_info_trace(&p, p+len, info, features);
 320                if (err < 0)
 321                        goto out_bad;
 322        }
 323
 324        /* extra */
 325        ceph_decode_32_safe(&p, end, len, bad);
 326        if (len > 0) {
 327                ceph_decode_need(&p, end, len, bad);
 328                err = parse_reply_info_extra(&p, p+len, info, features);
 329                if (err < 0)
 330                        goto out_bad;
 331        }
 332
 333        /* snap blob */
 334        ceph_decode_32_safe(&p, end, len, bad);
 335        info->snapblob_len = len;
 336        info->snapblob = p;
 337        p += len;
 338
 339        if (p != end)
 340                goto bad;
 341        return 0;
 342
 343bad:
 344        err = -EIO;
 345out_bad:
 346        pr_err("mds parse_reply err %d\n", err);
 347        return err;
 348}
 349
 350static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 351{
 352        if (!info->dir_entries)
 353                return;
 354        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 355}
 356
 357
 358/*
 359 * sessions
 360 */
 361const char *ceph_session_state_name(int s)
 362{
 363        switch (s) {
 364        case CEPH_MDS_SESSION_NEW: return "new";
 365        case CEPH_MDS_SESSION_OPENING: return "opening";
 366        case CEPH_MDS_SESSION_OPEN: return "open";
 367        case CEPH_MDS_SESSION_HUNG: return "hung";
 368        case CEPH_MDS_SESSION_CLOSING: return "closing";
 369        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 370        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 371        case CEPH_MDS_SESSION_REJECTED: return "rejected";
 372        default: return "???";
 373        }
 374}
 375
 376static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 377{
 378        if (atomic_inc_not_zero(&s->s_ref)) {
 379                dout("mdsc get_session %p %d -> %d\n", s,
 380                     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
 381                return s;
 382        } else {
 383                dout("mdsc get_session %p 0 -- FAIL", s);
 384                return NULL;
 385        }
 386}
 387
 388void ceph_put_mds_session(struct ceph_mds_session *s)
 389{
 390        dout("mdsc put_session %p %d -> %d\n", s,
 391             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 392        if (atomic_dec_and_test(&s->s_ref)) {
 393                if (s->s_auth.authorizer)
 394                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 395                kfree(s);
 396        }
 397}
 398
 399/*
 400 * called under mdsc->mutex
 401 */
 402struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 403                                                   int mds)
 404{
 405        struct ceph_mds_session *session;
 406
 407        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 408                return NULL;
 409        session = mdsc->sessions[mds];
 410        dout("lookup_mds_session %p %d\n", session,
 411             atomic_read(&session->s_ref));
 412        get_session(session);
 413        return session;
 414}
 415
 416static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 417{
 418        if (mds >= mdsc->max_sessions)
 419                return false;
 420        return mdsc->sessions[mds];
 421}
 422
 423static int __verify_registered_session(struct ceph_mds_client *mdsc,
 424                                       struct ceph_mds_session *s)
 425{
 426        if (s->s_mds >= mdsc->max_sessions ||
 427            mdsc->sessions[s->s_mds] != s)
 428                return -ENOENT;
 429        return 0;
 430}
 431
 432/*
 433 * create+register a new session for given mds.
 434 * called under mdsc->mutex.
 435 */
 436static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 437                                                 int mds)
 438{
 439        struct ceph_mds_session *s;
 440
 441        if (mds >= mdsc->mdsmap->m_num_mds)
 442                return ERR_PTR(-EINVAL);
 443
 444        s = kzalloc(sizeof(*s), GFP_NOFS);
 445        if (!s)
 446                return ERR_PTR(-ENOMEM);
 447        s->s_mdsc = mdsc;
 448        s->s_mds = mds;
 449        s->s_state = CEPH_MDS_SESSION_NEW;
 450        s->s_ttl = 0;
 451        s->s_seq = 0;
 452        mutex_init(&s->s_mutex);
 453
 454        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 455
 456        spin_lock_init(&s->s_gen_ttl_lock);
 457        s->s_cap_gen = 0;
 458        s->s_cap_ttl = jiffies - 1;
 459
 460        spin_lock_init(&s->s_cap_lock);
 461        s->s_renew_requested = 0;
 462        s->s_renew_seq = 0;
 463        INIT_LIST_HEAD(&s->s_caps);
 464        s->s_nr_caps = 0;
 465        s->s_trim_caps = 0;
 466        atomic_set(&s->s_ref, 1);
 467        INIT_LIST_HEAD(&s->s_waiting);
 468        INIT_LIST_HEAD(&s->s_unsafe);
 469        s->s_num_cap_releases = 0;
 470        s->s_cap_reconnect = 0;
 471        s->s_cap_iterator = NULL;
 472        INIT_LIST_HEAD(&s->s_cap_releases);
 473        INIT_LIST_HEAD(&s->s_cap_flushing);
 474
 475        dout("register_session mds%d\n", mds);
 476        if (mds >= mdsc->max_sessions) {
 477                int newmax = 1 << get_count_order(mds+1);
 478                struct ceph_mds_session **sa;
 479
 480                dout("register_session realloc to %d\n", newmax);
 481                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 482                if (!sa)
 483                        goto fail_realloc;
 484                if (mdsc->sessions) {
 485                        memcpy(sa, mdsc->sessions,
 486                               mdsc->max_sessions * sizeof(void *));
 487                        kfree(mdsc->sessions);
 488                }
 489                mdsc->sessions = sa;
 490                mdsc->max_sessions = newmax;
 491        }
 492        mdsc->sessions[mds] = s;
 493        atomic_inc(&mdsc->num_sessions);
 494        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 495
 496        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 497                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 498
 499        return s;
 500
 501fail_realloc:
 502        kfree(s);
 503        return ERR_PTR(-ENOMEM);
 504}
 505
 506/*
 507 * called under mdsc->mutex
 508 */
 509static void __unregister_session(struct ceph_mds_client *mdsc,
 510                               struct ceph_mds_session *s)
 511{
 512        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 513        BUG_ON(mdsc->sessions[s->s_mds] != s);
 514        mdsc->sessions[s->s_mds] = NULL;
 515        ceph_con_close(&s->s_con);
 516        ceph_put_mds_session(s);
 517        atomic_dec(&mdsc->num_sessions);
 518}
 519
 520/*
 521 * drop session refs in request.
 522 *
 523 * should be last request ref, or hold mdsc->mutex
 524 */
 525static void put_request_session(struct ceph_mds_request *req)
 526{
 527        if (req->r_session) {
 528                ceph_put_mds_session(req->r_session);
 529                req->r_session = NULL;
 530        }
 531}
 532
 533void ceph_mdsc_release_request(struct kref *kref)
 534{
 535        struct ceph_mds_request *req = container_of(kref,
 536                                                    struct ceph_mds_request,
 537                                                    r_kref);
 538        destroy_reply_info(&req->r_reply_info);
 539        if (req->r_request)
 540                ceph_msg_put(req->r_request);
 541        if (req->r_reply)
 542                ceph_msg_put(req->r_reply);
 543        if (req->r_inode) {
 544                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 545                iput(req->r_inode);
 546        }
 547        if (req->r_parent)
 548                ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
 549        iput(req->r_target_inode);
 550        if (req->r_dentry)
 551                dput(req->r_dentry);
 552        if (req->r_old_dentry)
 553                dput(req->r_old_dentry);
 554        if (req->r_old_dentry_dir) {
 555                /*
 556                 * track (and drop pins for) r_old_dentry_dir
 557                 * separately, since r_old_dentry's d_parent may have
 558                 * changed between the dir mutex being dropped and
 559                 * this request being freed.
 560                 */
 561                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 562                                  CEPH_CAP_PIN);
 563                iput(req->r_old_dentry_dir);
 564        }
 565        kfree(req->r_path1);
 566        kfree(req->r_path2);
 567        if (req->r_pagelist)
 568                ceph_pagelist_release(req->r_pagelist);
 569        put_request_session(req);
 570        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 571        kfree(req);
 572}
 573
 574DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
 575
 576/*
 577 * lookup session, bump ref if found.
 578 *
 579 * called under mdsc->mutex.
 580 */
 581static struct ceph_mds_request *
 582lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
 583{
 584        struct ceph_mds_request *req;
 585
 586        req = lookup_request(&mdsc->request_tree, tid);
 587        if (req)
 588                ceph_mdsc_get_request(req);
 589
 590        return req;
 591}
 592
 593/*
 594 * Register an in-flight request, and assign a tid.  Link to directory
 595 * are modifying (if any).
 596 *
 597 * Called under mdsc->mutex.
 598 */
 599static void __register_request(struct ceph_mds_client *mdsc,
 600                               struct ceph_mds_request *req,
 601                               struct inode *dir)
 602{
 603        req->r_tid = ++mdsc->last_tid;
 604        if (req->r_num_caps)
 605                ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 606                                  req->r_num_caps);
 607        dout("__register_request %p tid %lld\n", req, req->r_tid);
 608        ceph_mdsc_get_request(req);
 609        insert_request(&mdsc->request_tree, req);
 610
 611        req->r_uid = current_fsuid();
 612        req->r_gid = current_fsgid();
 613
 614        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 615                mdsc->oldest_tid = req->r_tid;
 616
 617        if (dir) {
 618                ihold(dir);
 619                req->r_unsafe_dir = dir;
 620        }
 621}
 622
 623static void __unregister_request(struct ceph_mds_client *mdsc,
 624                                 struct ceph_mds_request *req)
 625{
 626        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 627
 628        /* Never leave an unregistered request on an unsafe list! */
 629        list_del_init(&req->r_unsafe_item);
 630
 631        if (req->r_tid == mdsc->oldest_tid) {
 632                struct rb_node *p = rb_next(&req->r_node);
 633                mdsc->oldest_tid = 0;
 634                while (p) {
 635                        struct ceph_mds_request *next_req =
 636                                rb_entry(p, struct ceph_mds_request, r_node);
 637                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 638                                mdsc->oldest_tid = next_req->r_tid;
 639                                break;
 640                        }
 641                        p = rb_next(p);
 642                }
 643        }
 644
 645        erase_request(&mdsc->request_tree, req);
 646
 647        if (req->r_unsafe_dir  &&
 648            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 649                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 650                spin_lock(&ci->i_unsafe_lock);
 651                list_del_init(&req->r_unsafe_dir_item);
 652                spin_unlock(&ci->i_unsafe_lock);
 653        }
 654        if (req->r_target_inode &&
 655            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 656                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 657                spin_lock(&ci->i_unsafe_lock);
 658                list_del_init(&req->r_unsafe_target_item);
 659                spin_unlock(&ci->i_unsafe_lock);
 660        }
 661
 662        if (req->r_unsafe_dir) {
 663                iput(req->r_unsafe_dir);
 664                req->r_unsafe_dir = NULL;
 665        }
 666
 667        complete_all(&req->r_safe_completion);
 668
 669        ceph_mdsc_put_request(req);
 670}
 671
 672/*
 673 * Walk back up the dentry tree until we hit a dentry representing a
 674 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
 675 * when calling this) to ensure that the objects won't disappear while we're
 676 * working with them. Once we hit a candidate dentry, we attempt to take a
 677 * reference to it, and return that as the result.
 678 */
 679static struct inode *get_nonsnap_parent(struct dentry *dentry)
 680{
 681        struct inode *inode = NULL;
 682
 683        while (dentry && !IS_ROOT(dentry)) {
 684                inode = d_inode_rcu(dentry);
 685                if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
 686                        break;
 687                dentry = dentry->d_parent;
 688        }
 689        if (inode)
 690                inode = igrab(inode);
 691        return inode;
 692}
 693
 694/*
 695 * Choose mds to send request to next.  If there is a hint set in the
 696 * request (e.g., due to a prior forward hint from the mds), use that.
 697 * Otherwise, consult frag tree and/or caps to identify the
 698 * appropriate mds.  If all else fails, choose randomly.
 699 *
 700 * Called under mdsc->mutex.
 701 */
 702static int __choose_mds(struct ceph_mds_client *mdsc,
 703                        struct ceph_mds_request *req)
 704{
 705        struct inode *inode;
 706        struct ceph_inode_info *ci;
 707        struct ceph_cap *cap;
 708        int mode = req->r_direct_mode;
 709        int mds = -1;
 710        u32 hash = req->r_direct_hash;
 711        bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
 712
 713        /*
 714         * is there a specific mds we should try?  ignore hint if we have
 715         * no session and the mds is not up (active or recovering).
 716         */
 717        if (req->r_resend_mds >= 0 &&
 718            (__have_session(mdsc, req->r_resend_mds) ||
 719             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 720                dout("choose_mds using resend_mds mds%d\n",
 721                     req->r_resend_mds);
 722                return req->r_resend_mds;
 723        }
 724
 725        if (mode == USE_RANDOM_MDS)
 726                goto random;
 727
 728        inode = NULL;
 729        if (req->r_inode) {
 730                if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
 731                        inode = req->r_inode;
 732                        ihold(inode);
 733                } else {
 734                        /* req->r_dentry is non-null for LSSNAP request */
 735                        rcu_read_lock();
 736                        inode = get_nonsnap_parent(req->r_dentry);
 737                        rcu_read_unlock();
 738                        dout("__choose_mds using snapdir's parent %p\n", inode);
 739                }
 740        } else if (req->r_dentry) {
 741                /* ignore race with rename; old or new d_parent is okay */
 742                struct dentry *parent;
 743                struct inode *dir;
 744
 745                rcu_read_lock();
 746                parent = req->r_dentry->d_parent;
 747                dir = req->r_parent ? : d_inode_rcu(parent);
 748
 749                if (!dir || dir->i_sb != mdsc->fsc->sb) {
 750                        /*  not this fs or parent went negative */
 751                        inode = d_inode(req->r_dentry);
 752                        if (inode)
 753                                ihold(inode);
 754                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
 755                        /* direct snapped/virtual snapdir requests
 756                         * based on parent dir inode */
 757                        inode = get_nonsnap_parent(parent);
 758                        dout("__choose_mds using nonsnap parent %p\n", inode);
 759                } else {
 760                        /* dentry target */
 761                        inode = req->r_dentry->d_inode;
 762                        if (!inode || mode == USE_AUTH_MDS) {
 763                                /* dir + name */
 764                                inode = igrab(dir);
 765                                hash = ceph_dentry_hash(dir, req->r_dentry);
 766                                is_hash = true;
 767                        } else {
 768                                ihold(inode);
 769                        }
 770                }
 771                rcu_read_unlock();
 772        }
 773
 774        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 775             (int)hash, mode);
 776        if (!inode)
 777                goto random;
 778        ci = ceph_inode(inode);
 779
 780        if (is_hash && S_ISDIR(inode->i_mode)) {
 781                struct ceph_inode_frag frag;
 782                int found;
 783
 784                ceph_choose_frag(ci, hash, &frag, &found);
 785                if (found) {
 786                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
 787                                u8 r;
 788
 789                                /* choose a random replica */
 790                                get_random_bytes(&r, 1);
 791                                r %= frag.ndist;
 792                                mds = frag.dist[r];
 793                                dout("choose_mds %p %llx.%llx "
 794                                     "frag %u mds%d (%d/%d)\n",
 795                                     inode, ceph_vinop(inode),
 796                                     frag.frag, mds,
 797                                     (int)r, frag.ndist);
 798                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 799                                    CEPH_MDS_STATE_ACTIVE)
 800                                        goto out;
 801                        }
 802
 803                        /* since this file/dir wasn't known to be
 804                         * replicated, then we want to look for the
 805                         * authoritative mds. */
 806                        mode = USE_AUTH_MDS;
 807                        if (frag.mds >= 0) {
 808                                /* choose auth mds */
 809                                mds = frag.mds;
 810                                dout("choose_mds %p %llx.%llx "
 811                                     "frag %u mds%d (auth)\n",
 812                                     inode, ceph_vinop(inode), frag.frag, mds);
 813                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 814                                    CEPH_MDS_STATE_ACTIVE)
 815                                        goto out;
 816                        }
 817                }
 818        }
 819
 820        spin_lock(&ci->i_ceph_lock);
 821        cap = NULL;
 822        if (mode == USE_AUTH_MDS)
 823                cap = ci->i_auth_cap;
 824        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 825                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 826        if (!cap) {
 827                spin_unlock(&ci->i_ceph_lock);
 828                iput(inode);
 829                goto random;
 830        }
 831        mds = cap->session->s_mds;
 832        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 833             inode, ceph_vinop(inode), mds,
 834             cap == ci->i_auth_cap ? "auth " : "", cap);
 835        spin_unlock(&ci->i_ceph_lock);
 836out:
 837        iput(inode);
 838        return mds;
 839
 840random:
 841        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 842        dout("choose_mds chose random mds%d\n", mds);
 843        return mds;
 844}
 845
 846
 847/*
 848 * session messages
 849 */
 850static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 851{
 852        struct ceph_msg *msg;
 853        struct ceph_mds_session_head *h;
 854
 855        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 856                           false);
 857        if (!msg) {
 858                pr_err("create_session_msg ENOMEM creating msg\n");
 859                return NULL;
 860        }
 861        h = msg->front.iov_base;
 862        h->op = cpu_to_le32(op);
 863        h->seq = cpu_to_le64(seq);
 864
 865        return msg;
 866}
 867
 868/*
 869 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
 870 * to include additional client metadata fields.
 871 */
 872static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
 873{
 874        struct ceph_msg *msg;
 875        struct ceph_mds_session_head *h;
 876        int i = -1;
 877        int metadata_bytes = 0;
 878        int metadata_key_count = 0;
 879        struct ceph_options *opt = mdsc->fsc->client->options;
 880        struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
 881        void *p;
 882
 883        const char* metadata[][2] = {
 884                {"hostname", mdsc->nodename},
 885                {"kernel_version", init_utsname()->release},
 886                {"entity_id", opt->name ? : ""},
 887                {"root", fsopt->server_path ? : "/"},
 888                {NULL, NULL}
 889        };
 890
 891        /* Calculate serialized length of metadata */
 892        metadata_bytes = 4;  /* map length */
 893        for (i = 0; metadata[i][0]; ++i) {
 894                metadata_bytes += 8 + strlen(metadata[i][0]) +
 895                        strlen(metadata[i][1]);
 896                metadata_key_count++;
 897        }
 898
 899        /* Allocate the message */
 900        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
 901                           GFP_NOFS, false);
 902        if (!msg) {
 903                pr_err("create_session_msg ENOMEM creating msg\n");
 904                return NULL;
 905        }
 906        h = msg->front.iov_base;
 907        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
 908        h->seq = cpu_to_le64(seq);
 909
 910        /*
 911         * Serialize client metadata into waiting buffer space, using
 912         * the format that userspace expects for map<string, string>
 913         *
 914         * ClientSession messages with metadata are v2
 915         */
 916        msg->hdr.version = cpu_to_le16(2);
 917        msg->hdr.compat_version = cpu_to_le16(1);
 918
 919        /* The write pointer, following the session_head structure */
 920        p = msg->front.iov_base + sizeof(*h);
 921
 922        /* Number of entries in the map */
 923        ceph_encode_32(&p, metadata_key_count);
 924
 925        /* Two length-prefixed strings for each entry in the map */
 926        for (i = 0; metadata[i][0]; ++i) {
 927                size_t const key_len = strlen(metadata[i][0]);
 928                size_t const val_len = strlen(metadata[i][1]);
 929
 930                ceph_encode_32(&p, key_len);
 931                memcpy(p, metadata[i][0], key_len);
 932                p += key_len;
 933                ceph_encode_32(&p, val_len);
 934                memcpy(p, metadata[i][1], val_len);
 935                p += val_len;
 936        }
 937
 938        return msg;
 939}
 940
 941/*
 942 * send session open request.
 943 *
 944 * called under mdsc->mutex
 945 */
 946static int __open_session(struct ceph_mds_client *mdsc,
 947                          struct ceph_mds_session *session)
 948{
 949        struct ceph_msg *msg;
 950        int mstate;
 951        int mds = session->s_mds;
 952
 953        /* wait for mds to go active? */
 954        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 955        dout("open_session to mds%d (%s)\n", mds,
 956             ceph_mds_state_name(mstate));
 957        session->s_state = CEPH_MDS_SESSION_OPENING;
 958        session->s_renew_requested = jiffies;
 959
 960        /* send connect message */
 961        msg = create_session_open_msg(mdsc, session->s_seq);
 962        if (!msg)
 963                return -ENOMEM;
 964        ceph_con_send(&session->s_con, msg);
 965        return 0;
 966}
 967
 968/*
 969 * open sessions for any export targets for the given mds
 970 *
 971 * called under mdsc->mutex
 972 */
 973static struct ceph_mds_session *
 974__open_export_target_session(struct ceph_mds_client *mdsc, int target)
 975{
 976        struct ceph_mds_session *session;
 977
 978        session = __ceph_lookup_mds_session(mdsc, target);
 979        if (!session) {
 980                session = register_session(mdsc, target);
 981                if (IS_ERR(session))
 982                        return session;
 983        }
 984        if (session->s_state == CEPH_MDS_SESSION_NEW ||
 985            session->s_state == CEPH_MDS_SESSION_CLOSING)
 986                __open_session(mdsc, session);
 987
 988        return session;
 989}
 990
 991struct ceph_mds_session *
 992ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
 993{
 994        struct ceph_mds_session *session;
 995
 996        dout("open_export_target_session to mds%d\n", target);
 997
 998        mutex_lock(&mdsc->mutex);
 999        session = __open_export_target_session(mdsc, target);
1000        mutex_unlock(&mdsc->mutex);
1001
1002        return session;
1003}
1004
1005static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1006                                          struct ceph_mds_session *session)
1007{
1008        struct ceph_mds_info *mi;
1009        struct ceph_mds_session *ts;
1010        int i, mds = session->s_mds;
1011
1012        if (mds >= mdsc->mdsmap->m_num_mds)
1013                return;
1014
1015        mi = &mdsc->mdsmap->m_info[mds];
1016        dout("open_export_target_sessions for mds%d (%d targets)\n",
1017             session->s_mds, mi->num_export_targets);
1018
1019        for (i = 0; i < mi->num_export_targets; i++) {
1020                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1021                if (!IS_ERR(ts))
1022                        ceph_put_mds_session(ts);
1023        }
1024}
1025
1026void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1027                                           struct ceph_mds_session *session)
1028{
1029        mutex_lock(&mdsc->mutex);
1030        __open_export_target_sessions(mdsc, session);
1031        mutex_unlock(&mdsc->mutex);
1032}
1033
1034/*
1035 * session caps
1036 */
1037
1038/* caller holds s_cap_lock, we drop it */
1039static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1040                                 struct ceph_mds_session *session)
1041        __releases(session->s_cap_lock)
1042{
1043        LIST_HEAD(tmp_list);
1044        list_splice_init(&session->s_cap_releases, &tmp_list);
1045        session->s_num_cap_releases = 0;
1046        spin_unlock(&session->s_cap_lock);
1047
1048        dout("cleanup_cap_releases mds%d\n", session->s_mds);
1049        while (!list_empty(&tmp_list)) {
1050                struct ceph_cap *cap;
1051                /* zero out the in-progress message */
1052                cap = list_first_entry(&tmp_list,
1053                                        struct ceph_cap, session_caps);
1054                list_del(&cap->session_caps);
1055                ceph_put_cap(mdsc, cap);
1056        }
1057}
1058
1059static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1060                                     struct ceph_mds_session *session)
1061{
1062        struct ceph_mds_request *req;
1063        struct rb_node *p;
1064
1065        dout("cleanup_session_requests mds%d\n", session->s_mds);
1066        mutex_lock(&mdsc->mutex);
1067        while (!list_empty(&session->s_unsafe)) {
1068                req = list_first_entry(&session->s_unsafe,
1069                                       struct ceph_mds_request, r_unsafe_item);
1070                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1071                                    req->r_tid);
1072                __unregister_request(mdsc, req);
1073        }
1074        /* zero r_attempts, so kick_requests() will re-send requests */
1075        p = rb_first(&mdsc->request_tree);
1076        while (p) {
1077                req = rb_entry(p, struct ceph_mds_request, r_node);
1078                p = rb_next(p);
1079                if (req->r_session &&
1080                    req->r_session->s_mds == session->s_mds)
1081                        req->r_attempts = 0;
1082        }
1083        mutex_unlock(&mdsc->mutex);
1084}
1085
1086/*
1087 * Helper to safely iterate over all caps associated with a session, with
1088 * special care taken to handle a racing __ceph_remove_cap().
1089 *
1090 * Caller must hold session s_mutex.
1091 */
1092static int iterate_session_caps(struct ceph_mds_session *session,
1093                                 int (*cb)(struct inode *, struct ceph_cap *,
1094                                            void *), void *arg)
1095{
1096        struct list_head *p;
1097        struct ceph_cap *cap;
1098        struct inode *inode, *last_inode = NULL;
1099        struct ceph_cap *old_cap = NULL;
1100        int ret;
1101
1102        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1103        spin_lock(&session->s_cap_lock);
1104        p = session->s_caps.next;
1105        while (p != &session->s_caps) {
1106                cap = list_entry(p, struct ceph_cap, session_caps);
1107                inode = igrab(&cap->ci->vfs_inode);
1108                if (!inode) {
1109                        p = p->next;
1110                        continue;
1111                }
1112                session->s_cap_iterator = cap;
1113                spin_unlock(&session->s_cap_lock);
1114
1115                if (last_inode) {
1116                        iput(last_inode);
1117                        last_inode = NULL;
1118                }
1119                if (old_cap) {
1120                        ceph_put_cap(session->s_mdsc, old_cap);
1121                        old_cap = NULL;
1122                }
1123
1124                ret = cb(inode, cap, arg);
1125                last_inode = inode;
1126
1127                spin_lock(&session->s_cap_lock);
1128                p = p->next;
1129                if (!cap->ci) {
1130                        dout("iterate_session_caps  finishing cap %p removal\n",
1131                             cap);
1132                        BUG_ON(cap->session != session);
1133                        cap->session = NULL;
1134                        list_del_init(&cap->session_caps);
1135                        session->s_nr_caps--;
1136                        if (cap->queue_release) {
1137                                list_add_tail(&cap->session_caps,
1138                                              &session->s_cap_releases);
1139                                session->s_num_cap_releases++;
1140                        } else {
1141                                old_cap = cap;  /* put_cap it w/o locks held */
1142                        }
1143                }
1144                if (ret < 0)
1145                        goto out;
1146        }
1147        ret = 0;
1148out:
1149        session->s_cap_iterator = NULL;
1150        spin_unlock(&session->s_cap_lock);
1151
1152        iput(last_inode);
1153        if (old_cap)
1154                ceph_put_cap(session->s_mdsc, old_cap);
1155
1156        return ret;
1157}
1158
1159static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1160                                  void *arg)
1161{
1162        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1163        struct ceph_inode_info *ci = ceph_inode(inode);
1164        LIST_HEAD(to_remove);
1165        bool drop = false;
1166        bool invalidate = false;
1167
1168        dout("removing cap %p, ci is %p, inode is %p\n",
1169             cap, ci, &ci->vfs_inode);
1170        spin_lock(&ci->i_ceph_lock);
1171        __ceph_remove_cap(cap, false);
1172        if (!ci->i_auth_cap) {
1173                struct ceph_cap_flush *cf;
1174                struct ceph_mds_client *mdsc = fsc->mdsc;
1175
1176                ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1177
1178                if (ci->i_wrbuffer_ref > 0 &&
1179                    ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1180                        invalidate = true;
1181
1182                while (!list_empty(&ci->i_cap_flush_list)) {
1183                        cf = list_first_entry(&ci->i_cap_flush_list,
1184                                              struct ceph_cap_flush, i_list);
1185                        list_move(&cf->i_list, &to_remove);
1186                }
1187
1188                spin_lock(&mdsc->cap_dirty_lock);
1189
1190                list_for_each_entry(cf, &to_remove, i_list)
1191                        list_del(&cf->g_list);
1192
1193                if (!list_empty(&ci->i_dirty_item)) {
1194                        pr_warn_ratelimited(
1195                                " dropping dirty %s state for %p %lld\n",
1196                                ceph_cap_string(ci->i_dirty_caps),
1197                                inode, ceph_ino(inode));
1198                        ci->i_dirty_caps = 0;
1199                        list_del_init(&ci->i_dirty_item);
1200                        drop = true;
1201                }
1202                if (!list_empty(&ci->i_flushing_item)) {
1203                        pr_warn_ratelimited(
1204                                " dropping dirty+flushing %s state for %p %lld\n",
1205                                ceph_cap_string(ci->i_flushing_caps),
1206                                inode, ceph_ino(inode));
1207                        ci->i_flushing_caps = 0;
1208                        list_del_init(&ci->i_flushing_item);
1209                        mdsc->num_cap_flushing--;
1210                        drop = true;
1211                }
1212                spin_unlock(&mdsc->cap_dirty_lock);
1213
1214                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1215                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1216                        ci->i_prealloc_cap_flush = NULL;
1217                }
1218        }
1219        spin_unlock(&ci->i_ceph_lock);
1220        while (!list_empty(&to_remove)) {
1221                struct ceph_cap_flush *cf;
1222                cf = list_first_entry(&to_remove,
1223                                      struct ceph_cap_flush, i_list);
1224                list_del(&cf->i_list);
1225                ceph_free_cap_flush(cf);
1226        }
1227
1228        wake_up_all(&ci->i_cap_wq);
1229        if (invalidate)
1230                ceph_queue_invalidate(inode);
1231        if (drop)
1232                iput(inode);
1233        return 0;
1234}
1235
1236/*
1237 * caller must hold session s_mutex
1238 */
1239static void remove_session_caps(struct ceph_mds_session *session)
1240{
1241        struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1242        struct super_block *sb = fsc->sb;
1243        dout("remove_session_caps on %p\n", session);
1244        iterate_session_caps(session, remove_session_caps_cb, fsc);
1245
1246        wake_up_all(&fsc->mdsc->cap_flushing_wq);
1247
1248        spin_lock(&session->s_cap_lock);
1249        if (session->s_nr_caps > 0) {
1250                struct inode *inode;
1251                struct ceph_cap *cap, *prev = NULL;
1252                struct ceph_vino vino;
1253                /*
1254                 * iterate_session_caps() skips inodes that are being
1255                 * deleted, we need to wait until deletions are complete.
1256                 * __wait_on_freeing_inode() is designed for the job,
1257                 * but it is not exported, so use lookup inode function
1258                 * to access it.
1259                 */
1260                while (!list_empty(&session->s_caps)) {
1261                        cap = list_entry(session->s_caps.next,
1262                                         struct ceph_cap, session_caps);
1263                        if (cap == prev)
1264                                break;
1265                        prev = cap;
1266                        vino = cap->ci->i_vino;
1267                        spin_unlock(&session->s_cap_lock);
1268
1269                        inode = ceph_find_inode(sb, vino);
1270                        iput(inode);
1271
1272                        spin_lock(&session->s_cap_lock);
1273                }
1274        }
1275
1276        // drop cap expires and unlock s_cap_lock
1277        cleanup_cap_releases(session->s_mdsc, session);
1278
1279        BUG_ON(session->s_nr_caps > 0);
1280        BUG_ON(!list_empty(&session->s_cap_flushing));
1281}
1282
1283/*
1284 * wake up any threads waiting on this session's caps.  if the cap is
1285 * old (didn't get renewed on the client reconnect), remove it now.
1286 *
1287 * caller must hold s_mutex.
1288 */
1289static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1290                              void *arg)
1291{
1292        struct ceph_inode_info *ci = ceph_inode(inode);
1293
1294        if (arg) {
1295                spin_lock(&ci->i_ceph_lock);
1296                ci->i_wanted_max_size = 0;
1297                ci->i_requested_max_size = 0;
1298                spin_unlock(&ci->i_ceph_lock);
1299        }
1300        wake_up_all(&ci->i_cap_wq);
1301        return 0;
1302}
1303
1304static void wake_up_session_caps(struct ceph_mds_session *session,
1305                                 int reconnect)
1306{
1307        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1308        iterate_session_caps(session, wake_up_session_cb,
1309                             (void *)(unsigned long)reconnect);
1310}
1311
1312/*
1313 * Send periodic message to MDS renewing all currently held caps.  The
1314 * ack will reset the expiration for all caps from this session.
1315 *
1316 * caller holds s_mutex
1317 */
1318static int send_renew_caps(struct ceph_mds_client *mdsc,
1319                           struct ceph_mds_session *session)
1320{
1321        struct ceph_msg *msg;
1322        int state;
1323
1324        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1325            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1326                pr_info("mds%d caps stale\n", session->s_mds);
1327        session->s_renew_requested = jiffies;
1328
1329        /* do not try to renew caps until a recovering mds has reconnected
1330         * with its clients. */
1331        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1332        if (state < CEPH_MDS_STATE_RECONNECT) {
1333                dout("send_renew_caps ignoring mds%d (%s)\n",
1334                     session->s_mds, ceph_mds_state_name(state));
1335                return 0;
1336        }
1337
1338        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1339                ceph_mds_state_name(state));
1340        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1341                                 ++session->s_renew_seq);
1342        if (!msg)
1343                return -ENOMEM;
1344        ceph_con_send(&session->s_con, msg);
1345        return 0;
1346}
1347
1348static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1349                             struct ceph_mds_session *session, u64 seq)
1350{
1351        struct ceph_msg *msg;
1352
1353        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1354             session->s_mds, ceph_session_state_name(session->s_state), seq);
1355        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1356        if (!msg)
1357                return -ENOMEM;
1358        ceph_con_send(&session->s_con, msg);
1359        return 0;
1360}
1361
1362
1363/*
1364 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1365 *
1366 * Called under session->s_mutex
1367 */
1368static void renewed_caps(struct ceph_mds_client *mdsc,
1369                         struct ceph_mds_session *session, int is_renew)
1370{
1371        int was_stale;
1372        int wake = 0;
1373
1374        spin_lock(&session->s_cap_lock);
1375        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1376
1377        session->s_cap_ttl = session->s_renew_requested +
1378                mdsc->mdsmap->m_session_timeout*HZ;
1379
1380        if (was_stale) {
1381                if (time_before(jiffies, session->s_cap_ttl)) {
1382                        pr_info("mds%d caps renewed\n", session->s_mds);
1383                        wake = 1;
1384                } else {
1385                        pr_info("mds%d caps still stale\n", session->s_mds);
1386                }
1387        }
1388        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1389             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1390             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1391        spin_unlock(&session->s_cap_lock);
1392
1393        if (wake)
1394                wake_up_session_caps(session, 0);
1395}
1396
1397/*
1398 * send a session close request
1399 */
1400static int request_close_session(struct ceph_mds_client *mdsc,
1401                                 struct ceph_mds_session *session)
1402{
1403        struct ceph_msg *msg;
1404
1405        dout("request_close_session mds%d state %s seq %lld\n",
1406             session->s_mds, ceph_session_state_name(session->s_state),
1407             session->s_seq);
1408        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1409        if (!msg)
1410                return -ENOMEM;
1411        ceph_con_send(&session->s_con, msg);
1412        return 1;
1413}
1414
1415/*
1416 * Called with s_mutex held.
1417 */
1418static int __close_session(struct ceph_mds_client *mdsc,
1419                         struct ceph_mds_session *session)
1420{
1421        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1422                return 0;
1423        session->s_state = CEPH_MDS_SESSION_CLOSING;
1424        return request_close_session(mdsc, session);
1425}
1426
1427/*
1428 * Trim old(er) caps.
1429 *
1430 * Because we can't cache an inode without one or more caps, we do
1431 * this indirectly: if a cap is unused, we prune its aliases, at which
1432 * point the inode will hopefully get dropped to.
1433 *
1434 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1435 * memory pressure from the MDS, though, so it needn't be perfect.
1436 */
1437static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1438{
1439        struct ceph_mds_session *session = arg;
1440        struct ceph_inode_info *ci = ceph_inode(inode);
1441        int used, wanted, oissued, mine;
1442
1443        if (session->s_trim_caps <= 0)
1444                return -1;
1445
1446        spin_lock(&ci->i_ceph_lock);
1447        mine = cap->issued | cap->implemented;
1448        used = __ceph_caps_used(ci);
1449        wanted = __ceph_caps_file_wanted(ci);
1450        oissued = __ceph_caps_issued_other(ci, cap);
1451
1452        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1453             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1454             ceph_cap_string(used), ceph_cap_string(wanted));
1455        if (cap == ci->i_auth_cap) {
1456                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1457                    !list_empty(&ci->i_cap_snaps))
1458                        goto out;
1459                if ((used | wanted) & CEPH_CAP_ANY_WR)
1460                        goto out;
1461        }
1462        /* The inode has cached pages, but it's no longer used.
1463         * we can safely drop it */
1464        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1465            !(oissued & CEPH_CAP_FILE_CACHE)) {
1466          used = 0;
1467          oissued = 0;
1468        }
1469        if ((used | wanted) & ~oissued & mine)
1470                goto out;   /* we need these caps */
1471
1472        session->s_trim_caps--;
1473        if (oissued) {
1474                /* we aren't the only cap.. just remove us */
1475                __ceph_remove_cap(cap, true);
1476        } else {
1477                /* try dropping referring dentries */
1478                spin_unlock(&ci->i_ceph_lock);
1479                d_prune_aliases(inode);
1480                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1481                     inode, cap, atomic_read(&inode->i_count));
1482                return 0;
1483        }
1484
1485out:
1486        spin_unlock(&ci->i_ceph_lock);
1487        return 0;
1488}
1489
1490/*
1491 * Trim session cap count down to some max number.
1492 */
1493static int trim_caps(struct ceph_mds_client *mdsc,
1494                     struct ceph_mds_session *session,
1495                     int max_caps)
1496{
1497        int trim_caps = session->s_nr_caps - max_caps;
1498
1499        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1500             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1501        if (trim_caps > 0) {
1502                session->s_trim_caps = trim_caps;
1503                iterate_session_caps(session, trim_caps_cb, session);
1504                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1505                     session->s_mds, session->s_nr_caps, max_caps,
1506                        trim_caps - session->s_trim_caps);
1507                session->s_trim_caps = 0;
1508        }
1509
1510        ceph_send_cap_releases(mdsc, session);
1511        return 0;
1512}
1513
1514static int check_caps_flush(struct ceph_mds_client *mdsc,
1515                            u64 want_flush_tid)
1516{
1517        int ret = 1;
1518
1519        spin_lock(&mdsc->cap_dirty_lock);
1520        if (!list_empty(&mdsc->cap_flush_list)) {
1521                struct ceph_cap_flush *cf =
1522                        list_first_entry(&mdsc->cap_flush_list,
1523                                         struct ceph_cap_flush, g_list);
1524                if (cf->tid <= want_flush_tid) {
1525                        dout("check_caps_flush still flushing tid "
1526                             "%llu <= %llu\n", cf->tid, want_flush_tid);
1527                        ret = 0;
1528                }
1529        }
1530        spin_unlock(&mdsc->cap_dirty_lock);
1531        return ret;
1532}
1533
1534/*
1535 * flush all dirty inode data to disk.
1536 *
1537 * returns true if we've flushed through want_flush_tid
1538 */
1539static void wait_caps_flush(struct ceph_mds_client *mdsc,
1540                            u64 want_flush_tid)
1541{
1542        dout("check_caps_flush want %llu\n", want_flush_tid);
1543
1544        wait_event(mdsc->cap_flushing_wq,
1545                   check_caps_flush(mdsc, want_flush_tid));
1546
1547        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1548}
1549
1550/*
1551 * called under s_mutex
1552 */
1553void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1554                            struct ceph_mds_session *session)
1555{
1556        struct ceph_msg *msg = NULL;
1557        struct ceph_mds_cap_release *head;
1558        struct ceph_mds_cap_item *item;
1559        struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1560        struct ceph_cap *cap;
1561        LIST_HEAD(tmp_list);
1562        int num_cap_releases;
1563        __le32  barrier, *cap_barrier;
1564
1565        down_read(&osdc->lock);
1566        barrier = cpu_to_le32(osdc->epoch_barrier);
1567        up_read(&osdc->lock);
1568
1569        spin_lock(&session->s_cap_lock);
1570again:
1571        list_splice_init(&session->s_cap_releases, &tmp_list);
1572        num_cap_releases = session->s_num_cap_releases;
1573        session->s_num_cap_releases = 0;
1574        spin_unlock(&session->s_cap_lock);
1575
1576        while (!list_empty(&tmp_list)) {
1577                if (!msg) {
1578                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1579                                        PAGE_CACHE_SIZE, GFP_NOFS, false);
1580                        if (!msg)
1581                                goto out_err;
1582                        head = msg->front.iov_base;
1583                        head->num = cpu_to_le32(0);
1584                        msg->front.iov_len = sizeof(*head);
1585
1586                        msg->hdr.version = cpu_to_le16(2);
1587                        msg->hdr.compat_version = cpu_to_le16(1);
1588                }
1589
1590                cap = list_first_entry(&tmp_list, struct ceph_cap,
1591                                        session_caps);
1592                list_del(&cap->session_caps);
1593                num_cap_releases--;
1594
1595                head = msg->front.iov_base;
1596                le32_add_cpu(&head->num, 1);
1597                item = msg->front.iov_base + msg->front.iov_len;
1598                item->ino = cpu_to_le64(cap->cap_ino);
1599                item->cap_id = cpu_to_le64(cap->cap_id);
1600                item->migrate_seq = cpu_to_le32(cap->mseq);
1601                item->seq = cpu_to_le32(cap->issue_seq);
1602                msg->front.iov_len += sizeof(*item);
1603
1604                ceph_put_cap(mdsc, cap);
1605
1606                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1607                        // Append cap_barrier field
1608                        cap_barrier = msg->front.iov_base + msg->front.iov_len;
1609                        *cap_barrier = barrier;
1610                        msg->front.iov_len += sizeof(*cap_barrier);
1611
1612                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1613                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1614                        ceph_con_send(&session->s_con, msg);
1615                        msg = NULL;
1616                }
1617        }
1618
1619        BUG_ON(num_cap_releases != 0);
1620
1621        spin_lock(&session->s_cap_lock);
1622        if (!list_empty(&session->s_cap_releases))
1623                goto again;
1624        spin_unlock(&session->s_cap_lock);
1625
1626        if (msg) {
1627                // Append cap_barrier field
1628                cap_barrier = msg->front.iov_base + msg->front.iov_len;
1629                *cap_barrier = barrier;
1630                msg->front.iov_len += sizeof(*cap_barrier);
1631
1632                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1633                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1634                ceph_con_send(&session->s_con, msg);
1635        }
1636        return;
1637out_err:
1638        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1639                session->s_mds);
1640        spin_lock(&session->s_cap_lock);
1641        list_splice(&tmp_list, &session->s_cap_releases);
1642        session->s_num_cap_releases += num_cap_releases;
1643        spin_unlock(&session->s_cap_lock);
1644}
1645
1646/*
1647 * requests
1648 */
1649
1650int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1651                                    struct inode *dir)
1652{
1653        struct ceph_inode_info *ci = ceph_inode(dir);
1654        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1655        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1656        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1657        int order, num_entries;
1658
1659        spin_lock(&ci->i_ceph_lock);
1660        num_entries = ci->i_files + ci->i_subdirs;
1661        spin_unlock(&ci->i_ceph_lock);
1662        num_entries = max(num_entries, 1);
1663        num_entries = min(num_entries, opt->max_readdir);
1664
1665        order = get_order(size * num_entries);
1666        while (order >= 0) {
1667                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1668                                                             __GFP_NOWARN,
1669                                                             order);
1670                if (rinfo->dir_entries)
1671                        break;
1672                order--;
1673        }
1674        if (!rinfo->dir_entries)
1675                return -ENOMEM;
1676
1677        num_entries = (PAGE_SIZE << order) / size;
1678        num_entries = min(num_entries, opt->max_readdir);
1679
1680        rinfo->dir_buf_size = PAGE_SIZE << order;
1681        req->r_num_caps = num_entries + 1;
1682        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1683        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1684        return 0;
1685}
1686
1687/*
1688 * Create an mds request.
1689 */
1690struct ceph_mds_request *
1691ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1692{
1693        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1694
1695        if (!req)
1696                return ERR_PTR(-ENOMEM);
1697
1698        mutex_init(&req->r_fill_mutex);
1699        req->r_mdsc = mdsc;
1700        req->r_started = jiffies;
1701        req->r_resend_mds = -1;
1702        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1703        INIT_LIST_HEAD(&req->r_unsafe_target_item);
1704        req->r_fmode = -1;
1705        kref_init(&req->r_kref);
1706        RB_CLEAR_NODE(&req->r_node);
1707        INIT_LIST_HEAD(&req->r_wait);
1708        init_completion(&req->r_completion);
1709        init_completion(&req->r_safe_completion);
1710        INIT_LIST_HEAD(&req->r_unsafe_item);
1711
1712        req->r_stamp = current_fs_time(mdsc->fsc->sb);
1713
1714        req->r_op = op;
1715        req->r_direct_mode = mode;
1716        return req;
1717}
1718
1719/*
1720 * return oldest (lowest) request, tid in request tree, 0 if none.
1721 *
1722 * called under mdsc->mutex.
1723 */
1724static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1725{
1726        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1727                return NULL;
1728        return rb_entry(rb_first(&mdsc->request_tree),
1729                        struct ceph_mds_request, r_node);
1730}
1731
1732static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1733{
1734        return mdsc->oldest_tid;
1735}
1736
1737/*
1738 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1739 * on build_path_from_dentry in fs/cifs/dir.c.
1740 *
1741 * If @stop_on_nosnap, generate path relative to the first non-snapped
1742 * inode.
1743 *
1744 * Encode hidden .snap dirs as a double /, i.e.
1745 *   foo/.snap/bar -> foo//bar
1746 */
1747char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1748                           int stop_on_nosnap)
1749{
1750        struct dentry *temp;
1751        char *path;
1752        int len, pos;
1753        unsigned seq;
1754
1755        if (!dentry)
1756                return ERR_PTR(-EINVAL);
1757
1758retry:
1759        len = 0;
1760        seq = read_seqbegin(&rename_lock);
1761        rcu_read_lock();
1762        for (temp = dentry; !IS_ROOT(temp);) {
1763                struct inode *inode = temp->d_inode;
1764                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1765                        len++;  /* slash only */
1766                else if (stop_on_nosnap && inode &&
1767                         ceph_snap(inode) == CEPH_NOSNAP)
1768                        break;
1769                else
1770                        len += 1 + temp->d_name.len;
1771                temp = temp->d_parent;
1772        }
1773        rcu_read_unlock();
1774        if (len)
1775                len--;  /* no leading '/' */
1776
1777        path = kmalloc(len+1, GFP_NOFS);
1778        if (!path)
1779                return ERR_PTR(-ENOMEM);
1780        pos = len;
1781        path[pos] = 0;  /* trailing null */
1782        rcu_read_lock();
1783        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1784                struct inode *inode;
1785
1786                spin_lock(&temp->d_lock);
1787                inode = temp->d_inode;
1788                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1789                        dout("build_path path+%d: %p SNAPDIR\n",
1790                             pos, temp);
1791                } else if (stop_on_nosnap && inode &&
1792                           ceph_snap(inode) == CEPH_NOSNAP) {
1793                        spin_unlock(&temp->d_lock);
1794                        break;
1795                } else {
1796                        pos -= temp->d_name.len;
1797                        if (pos < 0) {
1798                                spin_unlock(&temp->d_lock);
1799                                break;
1800                        }
1801                        strncpy(path + pos, temp->d_name.name,
1802                                temp->d_name.len);
1803                }
1804                spin_unlock(&temp->d_lock);
1805                if (pos)
1806                        path[--pos] = '/';
1807                temp = temp->d_parent;
1808        }
1809        rcu_read_unlock();
1810        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1811                pr_err("build_path did not end path lookup where "
1812                       "expected, namelen is %d, pos is %d\n", len, pos);
1813                /* presumably this is only possible if racing with a
1814                   rename of one of the parent directories (we can not
1815                   lock the dentries above us to prevent this, but
1816                   retrying should be harmless) */
1817                kfree(path);
1818                goto retry;
1819        }
1820
1821        *base = ceph_ino(temp->d_inode);
1822        *plen = len;
1823        dout("build_path on %p %d built %llx '%.*s'\n",
1824             dentry, d_count(dentry), *base, len, path);
1825        return path;
1826}
1827
1828static int build_dentry_path(struct dentry *dentry, struct inode *dir,
1829                             const char **ppath, int *ppathlen, u64 *pino,
1830                             int *pfreepath)
1831{
1832        char *path;
1833
1834        rcu_read_lock();
1835        if (!dir)
1836                dir = d_inode_rcu(dentry->d_parent);
1837        if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
1838                *pino = ceph_ino(dir);
1839                rcu_read_unlock();
1840                *ppath = dentry->d_name.name;
1841                *ppathlen = dentry->d_name.len;
1842                return 0;
1843        }
1844        rcu_read_unlock();
1845        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1846        if (IS_ERR(path))
1847                return PTR_ERR(path);
1848        *ppath = path;
1849        *pfreepath = 1;
1850        return 0;
1851}
1852
1853static int build_inode_path(struct inode *inode,
1854                            const char **ppath, int *ppathlen, u64 *pino,
1855                            int *pfreepath)
1856{
1857        struct dentry *dentry;
1858        char *path;
1859
1860        if (ceph_snap(inode) == CEPH_NOSNAP) {
1861                *pino = ceph_ino(inode);
1862                *ppathlen = 0;
1863                return 0;
1864        }
1865        dentry = d_find_alias(inode);
1866        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1867        dput(dentry);
1868        if (IS_ERR(path))
1869                return PTR_ERR(path);
1870        *ppath = path;
1871        *pfreepath = 1;
1872        return 0;
1873}
1874
1875/*
1876 * request arguments may be specified via an inode *, a dentry *, or
1877 * an explicit ino+path.
1878 */
1879static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1880                                  struct inode *rdiri, const char *rpath,
1881                                  u64 rino, const char **ppath, int *pathlen,
1882                                  u64 *ino, int *freepath)
1883{
1884        int r = 0;
1885
1886        if (rinode) {
1887                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1888                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1889                     ceph_snap(rinode));
1890        } else if (rdentry) {
1891                r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1892                                        freepath);
1893                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1894                     *ppath);
1895        } else if (rpath || rino) {
1896                *ino = rino;
1897                *ppath = rpath;
1898                *pathlen = rpath ? strlen(rpath) : 0;
1899                dout(" path %.*s\n", *pathlen, rpath);
1900        }
1901
1902        return r;
1903}
1904
1905/*
1906 * called under mdsc->mutex
1907 */
1908static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1909                                               struct ceph_mds_request *req,
1910                                               int mds, bool drop_cap_releases)
1911{
1912        struct ceph_msg *msg;
1913        struct ceph_mds_request_head *head;
1914        const char *path1 = NULL;
1915        const char *path2 = NULL;
1916        u64 ino1 = 0, ino2 = 0;
1917        int pathlen1 = 0, pathlen2 = 0;
1918        int freepath1 = 0, freepath2 = 0;
1919        int len;
1920        u16 releases;
1921        void *p, *end;
1922        int ret;
1923
1924        ret = set_request_path_attr(req->r_inode, req->r_dentry,
1925                              req->r_parent, req->r_path1, req->r_ino1.ino,
1926                              &path1, &pathlen1, &ino1, &freepath1);
1927        if (ret < 0) {
1928                msg = ERR_PTR(ret);
1929                goto out;
1930        }
1931
1932        ret = set_request_path_attr(NULL, req->r_old_dentry,
1933                              req->r_old_dentry_dir,
1934                              req->r_path2, req->r_ino2.ino,
1935                              &path2, &pathlen2, &ino2, &freepath2);
1936        if (ret < 0) {
1937                msg = ERR_PTR(ret);
1938                goto out_free1;
1939        }
1940
1941        len = sizeof(*head) +
1942                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1943                sizeof(struct ceph_timespec);
1944
1945        /* calculate (max) length for cap releases */
1946        len += sizeof(struct ceph_mds_request_release) *
1947                (!!req->r_inode_drop + !!req->r_dentry_drop +
1948                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1949        if (req->r_dentry_drop)
1950                len += req->r_dentry->d_name.len;
1951        if (req->r_old_dentry_drop)
1952                len += req->r_old_dentry->d_name.len;
1953
1954        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1955        if (!msg) {
1956                msg = ERR_PTR(-ENOMEM);
1957                goto out_free2;
1958        }
1959
1960        msg->hdr.version = cpu_to_le16(2);
1961        msg->hdr.tid = cpu_to_le64(req->r_tid);
1962
1963        head = msg->front.iov_base;
1964        p = msg->front.iov_base + sizeof(*head);
1965        end = msg->front.iov_base + msg->front.iov_len;
1966
1967        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1968        head->op = cpu_to_le32(req->r_op);
1969        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1970        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1971        head->args = req->r_args;
1972
1973        ceph_encode_filepath(&p, end, ino1, path1);
1974        ceph_encode_filepath(&p, end, ino2, path2);
1975
1976        /* make note of release offset, in case we need to replay */
1977        req->r_request_release_offset = p - msg->front.iov_base;
1978
1979        /* cap releases */
1980        releases = 0;
1981        if (req->r_inode_drop)
1982                releases += ceph_encode_inode_release(&p,
1983                      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1984                      mds, req->r_inode_drop, req->r_inode_unless, 0);
1985        if (req->r_dentry_drop)
1986                releases += ceph_encode_dentry_release(&p, req->r_dentry,
1987                                req->r_parent, mds, req->r_dentry_drop,
1988                                req->r_dentry_unless);
1989        if (req->r_old_dentry_drop)
1990                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1991                                req->r_old_dentry_dir, mds,
1992                                req->r_old_dentry_drop,
1993                                req->r_old_dentry_unless);
1994        if (req->r_old_inode_drop)
1995                releases += ceph_encode_inode_release(&p,
1996                      req->r_old_dentry->d_inode,
1997                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1998
1999        if (drop_cap_releases) {
2000                releases = 0;
2001                p = msg->front.iov_base + req->r_request_release_offset;
2002        }
2003
2004        head->num_releases = cpu_to_le16(releases);
2005
2006        /* time stamp */
2007        {
2008                struct ceph_timespec ts;
2009                ceph_encode_timespec(&ts, &req->r_stamp);
2010                ceph_encode_copy(&p, &ts, sizeof(ts));
2011        }
2012
2013        BUG_ON(p > end);
2014        msg->front.iov_len = p - msg->front.iov_base;
2015        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2016
2017        if (req->r_pagelist) {
2018                struct ceph_pagelist *pagelist = req->r_pagelist;
2019                atomic_inc(&pagelist->refcnt);
2020                ceph_msg_data_add_pagelist(msg, pagelist);
2021                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2022        } else {
2023                msg->hdr.data_len = 0;
2024        }
2025
2026        msg->hdr.data_off = cpu_to_le16(0);
2027
2028out_free2:
2029        if (freepath2)
2030                kfree((char *)path2);
2031out_free1:
2032        if (freepath1)
2033                kfree((char *)path1);
2034out:
2035        return msg;
2036}
2037
2038/*
2039 * called under mdsc->mutex if error, under no mutex if
2040 * success.
2041 */
2042static void complete_request(struct ceph_mds_client *mdsc,
2043                             struct ceph_mds_request *req)
2044{
2045        if (req->r_callback)
2046                req->r_callback(mdsc, req);
2047        else
2048                complete_all(&req->r_completion);
2049}
2050
2051/*
2052 * called under mdsc->mutex
2053 */
2054static int __prepare_send_request(struct ceph_mds_client *mdsc,
2055                                  struct ceph_mds_request *req,
2056                                  int mds, bool drop_cap_releases)
2057{
2058        struct ceph_mds_request_head *rhead;
2059        struct ceph_msg *msg;
2060        int flags = 0;
2061
2062        req->r_attempts++;
2063        if (req->r_inode) {
2064                struct ceph_cap *cap =
2065                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2066
2067                if (cap)
2068                        req->r_sent_on_mseq = cap->mseq;
2069                else
2070                        req->r_sent_on_mseq = -1;
2071        }
2072        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2073             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2074
2075        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2076                void *p;
2077                /*
2078                 * Replay.  Do not regenerate message (and rebuild
2079                 * paths, etc.); just use the original message.
2080                 * Rebuilding paths will break for renames because
2081                 * d_move mangles the src name.
2082                 */
2083                msg = req->r_request;
2084                rhead = msg->front.iov_base;
2085
2086                flags = le32_to_cpu(rhead->flags);
2087                flags |= CEPH_MDS_FLAG_REPLAY;
2088                rhead->flags = cpu_to_le32(flags);
2089
2090                if (req->r_target_inode)
2091                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2092
2093                rhead->num_retry = req->r_attempts - 1;
2094
2095                /* remove cap/dentry releases from message */
2096                rhead->num_releases = 0;
2097
2098                /* time stamp */
2099                p = msg->front.iov_base + req->r_request_release_offset;
2100                {
2101                        struct ceph_timespec ts;
2102                        ceph_encode_timespec(&ts, &req->r_stamp);
2103                        ceph_encode_copy(&p, &ts, sizeof(ts));
2104                }
2105
2106                msg->front.iov_len = p - msg->front.iov_base;
2107                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2108                return 0;
2109        }
2110
2111        if (req->r_request) {
2112                ceph_msg_put(req->r_request);
2113                req->r_request = NULL;
2114        }
2115        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2116        if (IS_ERR(msg)) {
2117                req->r_err = PTR_ERR(msg);
2118                return PTR_ERR(msg);
2119        }
2120        req->r_request = msg;
2121
2122        rhead = msg->front.iov_base;
2123        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2124        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2125                flags |= CEPH_MDS_FLAG_REPLAY;
2126        if (req->r_parent)
2127                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2128        rhead->flags = cpu_to_le32(flags);
2129        rhead->num_fwd = req->r_num_fwd;
2130        rhead->num_retry = req->r_attempts - 1;
2131        rhead->ino = 0;
2132
2133        dout(" r_parent = %p\n", req->r_parent);
2134        return 0;
2135}
2136
2137/*
2138 * send request, or put it on the appropriate wait list.
2139 */
2140static int __do_request(struct ceph_mds_client *mdsc,
2141                        struct ceph_mds_request *req)
2142{
2143        struct ceph_mds_session *session = NULL;
2144        int mds = -1;
2145        int err = 0;
2146
2147        if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2148                if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2149                        __unregister_request(mdsc, req);
2150                goto out;
2151        }
2152
2153        if (req->r_timeout &&
2154            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2155                dout("do_request timed out\n");
2156                err = -EIO;
2157                goto finish;
2158        }
2159        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2160                dout("do_request forced umount\n");
2161                err = -EIO;
2162                goto finish;
2163        }
2164        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2165                if (mdsc->mdsmap_err) {
2166                        err = mdsc->mdsmap_err;
2167                        dout("do_request mdsmap err %d\n", err);
2168                        goto finish;
2169                }
2170                if (mdsc->mdsmap->m_epoch == 0) {
2171                        dout("do_request no mdsmap, waiting for map\n");
2172                        list_add(&req->r_wait, &mdsc->waiting_for_map);
2173                        goto finish;
2174                }
2175                if (!(mdsc->fsc->mount_options->flags &
2176                      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2177                    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2178                        err = -ENOENT;
2179                        pr_info("probably no mds server is up\n");
2180                        goto finish;
2181                }
2182        }
2183
2184        put_request_session(req);
2185
2186        mds = __choose_mds(mdsc, req);
2187        if (mds < 0 ||
2188            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2189                dout("do_request no mds or not active, waiting for map\n");
2190                list_add(&req->r_wait, &mdsc->waiting_for_map);
2191                goto out;
2192        }
2193
2194        /* get, open session */
2195        session = __ceph_lookup_mds_session(mdsc, mds);
2196        if (!session) {
2197                session = register_session(mdsc, mds);
2198                if (IS_ERR(session)) {
2199                        err = PTR_ERR(session);
2200                        goto finish;
2201                }
2202        }
2203        req->r_session = get_session(session);
2204
2205        dout("do_request mds%d session %p state %s\n", mds, session,
2206             ceph_session_state_name(session->s_state));
2207        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2208            session->s_state != CEPH_MDS_SESSION_HUNG) {
2209                if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2210                        err = -EACCES;
2211                        goto out_session;
2212                }
2213                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2214                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2215                        __open_session(mdsc, session);
2216                list_add(&req->r_wait, &session->s_waiting);
2217                goto out_session;
2218        }
2219
2220        /* send request */
2221        req->r_resend_mds = -1;   /* forget any previous mds hint */
2222
2223        if (req->r_request_started == 0)   /* note request start time */
2224                req->r_request_started = jiffies;
2225
2226        err = __prepare_send_request(mdsc, req, mds, false);
2227        if (!err) {
2228                ceph_msg_get(req->r_request);
2229                ceph_con_send(&session->s_con, req->r_request);
2230        }
2231
2232out_session:
2233        ceph_put_mds_session(session);
2234finish:
2235        if (err) {
2236                dout("__do_request early error %d\n", err);
2237                req->r_err = err;
2238                complete_request(mdsc, req);
2239                __unregister_request(mdsc, req);
2240        }
2241out:
2242        return err;
2243}
2244
2245/*
2246 * called under mdsc->mutex
2247 */
2248static void __wake_requests(struct ceph_mds_client *mdsc,
2249                            struct list_head *head)
2250{
2251        struct ceph_mds_request *req;
2252        LIST_HEAD(tmp_list);
2253
2254        list_splice_init(head, &tmp_list);
2255
2256        while (!list_empty(&tmp_list)) {
2257                req = list_entry(tmp_list.next,
2258                                 struct ceph_mds_request, r_wait);
2259                list_del_init(&req->r_wait);
2260                dout(" wake request %p tid %llu\n", req, req->r_tid);
2261                __do_request(mdsc, req);
2262        }
2263}
2264
2265/*
2266 * Wake up threads with requests pending for @mds, so that they can
2267 * resubmit their requests to a possibly different mds.
2268 */
2269static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2270{
2271        struct ceph_mds_request *req;
2272        struct rb_node *p = rb_first(&mdsc->request_tree);
2273
2274        dout("kick_requests mds%d\n", mds);
2275        while (p) {
2276                req = rb_entry(p, struct ceph_mds_request, r_node);
2277                p = rb_next(p);
2278                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2279                        continue;
2280                if (req->r_attempts > 0)
2281                        continue; /* only new requests */
2282                if (req->r_session &&
2283                    req->r_session->s_mds == mds) {
2284                        dout(" kicking tid %llu\n", req->r_tid);
2285                        list_del_init(&req->r_wait);
2286                        __do_request(mdsc, req);
2287                }
2288        }
2289}
2290
2291void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2292                              struct ceph_mds_request *req)
2293{
2294        dout("submit_request on %p\n", req);
2295        mutex_lock(&mdsc->mutex);
2296        __register_request(mdsc, req, NULL);
2297        __do_request(mdsc, req);
2298        mutex_unlock(&mdsc->mutex);
2299}
2300
2301/*
2302 * Synchrously perform an mds request.  Take care of all of the
2303 * session setup, forwarding, retry details.
2304 */
2305int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2306                         struct inode *dir,
2307                         struct ceph_mds_request *req)
2308{
2309        int err;
2310
2311        dout("do_request on %p\n", req);
2312
2313        /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2314        if (req->r_inode)
2315                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2316        if (req->r_parent)
2317                ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2318        if (req->r_old_dentry_dir)
2319                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2320                                  CEPH_CAP_PIN);
2321
2322        /* deny access to directories with pool_ns layouts */
2323        if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
2324            ceph_inode(req->r_inode)->i_pool_ns_len)
2325                return -EIO;
2326        if (test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
2327            ceph_inode(req->r_parent)->i_pool_ns_len)
2328                return -EIO;
2329
2330        /* issue */
2331        mutex_lock(&mdsc->mutex);
2332        __register_request(mdsc, req, dir);
2333        __do_request(mdsc, req);
2334
2335        if (req->r_err) {
2336                err = req->r_err;
2337                goto out;
2338        }
2339
2340        /* wait */
2341        mutex_unlock(&mdsc->mutex);
2342        dout("do_request waiting\n");
2343        if (!req->r_timeout && req->r_wait_for_completion) {
2344                err = req->r_wait_for_completion(mdsc, req);
2345        } else {
2346                long timeleft = wait_for_completion_killable_timeout(
2347                                        &req->r_completion,
2348                                        ceph_timeout_jiffies(req->r_timeout));
2349                if (timeleft > 0)
2350                        err = 0;
2351                else if (!timeleft)
2352                        err = -EIO;  /* timed out */
2353                else
2354                        err = timeleft;  /* killed */
2355        }
2356        dout("do_request waited, got %d\n", err);
2357        mutex_lock(&mdsc->mutex);
2358
2359        /* only abort if we didn't race with a real reply */
2360        if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2361                err = le32_to_cpu(req->r_reply_info.head->result);
2362        } else if (err < 0) {
2363                dout("aborted request %lld with %d\n", req->r_tid, err);
2364
2365                /*
2366                 * ensure we aren't running concurrently with
2367                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2368                 * rely on locks (dir mutex) held by our caller.
2369                 */
2370                mutex_lock(&req->r_fill_mutex);
2371                req->r_err = err;
2372                set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2373                mutex_unlock(&req->r_fill_mutex);
2374
2375                if (req->r_parent &&
2376                    (req->r_op & CEPH_MDS_OP_WRITE))
2377                        ceph_invalidate_dir_request(req);
2378        } else {
2379                err = req->r_err;
2380        }
2381
2382out:
2383        mutex_unlock(&mdsc->mutex);
2384        dout("do_request %p done, result %d\n", req, err);
2385        return err;
2386}
2387
2388/*
2389 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2390 * namespace request.
2391 */
2392void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2393{
2394        struct inode *inode = req->r_parent;
2395
2396        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2397
2398        ceph_dir_clear_complete(inode);
2399        if (req->r_dentry)
2400                ceph_invalidate_dentry_lease(req->r_dentry);
2401        if (req->r_old_dentry)
2402                ceph_invalidate_dentry_lease(req->r_old_dentry);
2403}
2404
2405/*
2406 * Handle mds reply.
2407 *
2408 * We take the session mutex and parse and process the reply immediately.
2409 * This preserves the logical ordering of replies, capabilities, etc., sent
2410 * by the MDS as they are applied to our local cache.
2411 */
2412static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2413{
2414        struct ceph_mds_client *mdsc = session->s_mdsc;
2415        struct ceph_mds_request *req;
2416        struct ceph_mds_reply_head *head = msg->front.iov_base;
2417        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2418        struct ceph_snap_realm *realm;
2419        u64 tid;
2420        int err, result;
2421        int mds = session->s_mds;
2422
2423        if (msg->front.iov_len < sizeof(*head)) {
2424                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2425                ceph_msg_dump(msg);
2426                return;
2427        }
2428
2429        /* get request, session */
2430        tid = le64_to_cpu(msg->hdr.tid);
2431        mutex_lock(&mdsc->mutex);
2432        req = lookup_get_request(mdsc, tid);
2433        if (!req) {
2434                dout("handle_reply on unknown tid %llu\n", tid);
2435                mutex_unlock(&mdsc->mutex);
2436                return;
2437        }
2438        dout("handle_reply %p\n", req);
2439
2440        /* correct session? */
2441        if (req->r_session != session) {
2442                pr_err("mdsc_handle_reply got %llu on session mds%d"
2443                       " not mds%d\n", tid, session->s_mds,
2444                       req->r_session ? req->r_session->s_mds : -1);
2445                mutex_unlock(&mdsc->mutex);
2446                goto out;
2447        }
2448
2449        /* dup? */
2450        if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2451            (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2452                pr_warn("got a dup %s reply on %llu from mds%d\n",
2453                           head->safe ? "safe" : "unsafe", tid, mds);
2454                mutex_unlock(&mdsc->mutex);
2455                goto out;
2456        }
2457        if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2458                pr_warn("got unsafe after safe on %llu from mds%d\n",
2459                           tid, mds);
2460                mutex_unlock(&mdsc->mutex);
2461                goto out;
2462        }
2463
2464        result = le32_to_cpu(head->result);
2465
2466        /*
2467         * Handle an ESTALE
2468         * if we're not talking to the authority, send to them
2469         * if the authority has changed while we weren't looking,
2470         * send to new authority
2471         * Otherwise we just have to return an ESTALE
2472         */
2473        if (result == -ESTALE) {
2474                dout("got ESTALE on request %llu", req->r_tid);
2475                req->r_resend_mds = -1;
2476                if (req->r_direct_mode != USE_AUTH_MDS) {
2477                        dout("not using auth, setting for that now");
2478                        req->r_direct_mode = USE_AUTH_MDS;
2479                        __do_request(mdsc, req);
2480                        mutex_unlock(&mdsc->mutex);
2481                        goto out;
2482                } else  {
2483                        int mds = __choose_mds(mdsc, req);
2484                        if (mds >= 0 && mds != req->r_session->s_mds) {
2485                                dout("but auth changed, so resending");
2486                                __do_request(mdsc, req);
2487                                mutex_unlock(&mdsc->mutex);
2488                                goto out;
2489                        }
2490                }
2491                dout("have to return ESTALE on request %llu", req->r_tid);
2492        }
2493
2494
2495        if (head->safe) {
2496                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2497                __unregister_request(mdsc, req);
2498
2499                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2500                        /*
2501                         * We already handled the unsafe response, now do the
2502                         * cleanup.  No need to examine the response; the MDS
2503                         * doesn't include any result info in the safe
2504                         * response.  And even if it did, there is nothing
2505                         * useful we could do with a revised return value.
2506                         */
2507                        dout("got safe reply %llu, mds%d\n", tid, mds);
2508
2509                        /* last unsafe request during umount? */
2510                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2511                                complete_all(&mdsc->safe_umount_waiters);
2512                        mutex_unlock(&mdsc->mutex);
2513                        goto out;
2514                }
2515        } else {
2516                set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2517                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2518                if (req->r_unsafe_dir) {
2519                        struct ceph_inode_info *ci =
2520                                        ceph_inode(req->r_unsafe_dir);
2521                        spin_lock(&ci->i_unsafe_lock);
2522                        list_add_tail(&req->r_unsafe_dir_item,
2523                                      &ci->i_unsafe_dirops);
2524                        spin_unlock(&ci->i_unsafe_lock);
2525                }
2526        }
2527
2528        dout("handle_reply tid %lld result %d\n", tid, result);
2529        rinfo = &req->r_reply_info;
2530        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2531        mutex_unlock(&mdsc->mutex);
2532
2533        mutex_lock(&session->s_mutex);
2534        if (err < 0) {
2535                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2536                ceph_msg_dump(msg);
2537                goto out_err;
2538        }
2539
2540        /* snap trace */
2541        realm = NULL;
2542        if (rinfo->snapblob_len) {
2543                down_write(&mdsc->snap_rwsem);
2544                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2545                                rinfo->snapblob + rinfo->snapblob_len,
2546                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2547                                &realm);
2548                downgrade_write(&mdsc->snap_rwsem);
2549        } else {
2550                down_read(&mdsc->snap_rwsem);
2551        }
2552
2553        /* insert trace into our cache */
2554        mutex_lock(&req->r_fill_mutex);
2555        current->journal_info = req;
2556        err = ceph_fill_trace(mdsc->fsc->sb, req);
2557        if (err == 0) {
2558                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2559                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2560                        ceph_readdir_prepopulate(req, req->r_session);
2561                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2562        }
2563        current->journal_info = NULL;
2564        mutex_unlock(&req->r_fill_mutex);
2565
2566        up_read(&mdsc->snap_rwsem);
2567        if (realm)
2568                ceph_put_snap_realm(mdsc, realm);
2569
2570        if (err == 0 && req->r_target_inode &&
2571            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2572                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2573                spin_lock(&ci->i_unsafe_lock);
2574                list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2575                spin_unlock(&ci->i_unsafe_lock);
2576        }
2577out_err:
2578        mutex_lock(&mdsc->mutex);
2579        if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2580                if (err) {
2581                        req->r_err = err;
2582                } else {
2583                        req->r_reply =  ceph_msg_get(msg);
2584                        set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2585                }
2586        } else {
2587                dout("reply arrived after request %lld was aborted\n", tid);
2588        }
2589        mutex_unlock(&mdsc->mutex);
2590
2591        mutex_unlock(&session->s_mutex);
2592
2593        /* kick calling process */
2594        complete_request(mdsc, req);
2595out:
2596        ceph_mdsc_put_request(req);
2597        return;
2598}
2599
2600
2601
2602/*
2603 * handle mds notification that our request has been forwarded.
2604 */
2605static void handle_forward(struct ceph_mds_client *mdsc,
2606                           struct ceph_mds_session *session,
2607                           struct ceph_msg *msg)
2608{
2609        struct ceph_mds_request *req;
2610        u64 tid = le64_to_cpu(msg->hdr.tid);
2611        u32 next_mds;
2612        u32 fwd_seq;
2613        int err = -EINVAL;
2614        void *p = msg->front.iov_base;
2615        void *end = p + msg->front.iov_len;
2616
2617        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2618        next_mds = ceph_decode_32(&p);
2619        fwd_seq = ceph_decode_32(&p);
2620
2621        mutex_lock(&mdsc->mutex);
2622        req = lookup_get_request(mdsc, tid);
2623        if (!req) {
2624                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2625                goto out;  /* dup reply? */
2626        }
2627
2628        if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2629                dout("forward tid %llu aborted, unregistering\n", tid);
2630                __unregister_request(mdsc, req);
2631        } else if (fwd_seq <= req->r_num_fwd) {
2632                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2633                     tid, next_mds, req->r_num_fwd, fwd_seq);
2634        } else {
2635                /* resend. forward race not possible; mds would drop */
2636                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2637                BUG_ON(req->r_err);
2638                BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2639                req->r_attempts = 0;
2640                req->r_num_fwd = fwd_seq;
2641                req->r_resend_mds = next_mds;
2642                put_request_session(req);
2643                __do_request(mdsc, req);
2644        }
2645        ceph_mdsc_put_request(req);
2646out:
2647        mutex_unlock(&mdsc->mutex);
2648        return;
2649
2650bad:
2651        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2652}
2653
2654/*
2655 * handle a mds session control message
2656 */
2657static void handle_session(struct ceph_mds_session *session,
2658                           struct ceph_msg *msg)
2659{
2660        struct ceph_mds_client *mdsc = session->s_mdsc;
2661        u32 op;
2662        u64 seq;
2663        int mds = session->s_mds;
2664        struct ceph_mds_session_head *h = msg->front.iov_base;
2665        int wake = 0;
2666
2667        /* decode */
2668        if (msg->front.iov_len != sizeof(*h))
2669                goto bad;
2670        op = le32_to_cpu(h->op);
2671        seq = le64_to_cpu(h->seq);
2672
2673        mutex_lock(&mdsc->mutex);
2674        if (op == CEPH_SESSION_CLOSE) {
2675                get_session(session);
2676                __unregister_session(mdsc, session);
2677        }
2678        /* FIXME: this ttl calculation is generous */
2679        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2680        mutex_unlock(&mdsc->mutex);
2681
2682        mutex_lock(&session->s_mutex);
2683
2684        dout("handle_session mds%d %s %p state %s seq %llu\n",
2685             mds, ceph_session_op_name(op), session,
2686             ceph_session_state_name(session->s_state), seq);
2687
2688        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2689                session->s_state = CEPH_MDS_SESSION_OPEN;
2690                pr_info("mds%d came back\n", session->s_mds);
2691        }
2692
2693        switch (op) {
2694        case CEPH_SESSION_OPEN:
2695                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2696                        pr_info("mds%d reconnect success\n", session->s_mds);
2697                session->s_state = CEPH_MDS_SESSION_OPEN;
2698                renewed_caps(mdsc, session, 0);
2699                wake = 1;
2700                if (mdsc->stopping)
2701                        __close_session(mdsc, session);
2702                break;
2703
2704        case CEPH_SESSION_RENEWCAPS:
2705                if (session->s_renew_seq == seq)
2706                        renewed_caps(mdsc, session, 1);
2707                break;
2708
2709        case CEPH_SESSION_CLOSE:
2710                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2711                        pr_info("mds%d reconnect denied\n", session->s_mds);
2712                cleanup_session_requests(mdsc, session);
2713                remove_session_caps(session);
2714                wake = 2; /* for good measure */
2715                wake_up_all(&mdsc->session_close_wq);
2716                break;
2717
2718        case CEPH_SESSION_STALE:
2719                pr_info("mds%d caps went stale, renewing\n",
2720                        session->s_mds);
2721                spin_lock(&session->s_gen_ttl_lock);
2722                session->s_cap_gen++;
2723                session->s_cap_ttl = jiffies - 1;
2724                spin_unlock(&session->s_gen_ttl_lock);
2725                send_renew_caps(mdsc, session);
2726                break;
2727
2728        case CEPH_SESSION_RECALL_STATE:
2729                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2730                break;
2731
2732        case CEPH_SESSION_FLUSHMSG:
2733                send_flushmsg_ack(mdsc, session, seq);
2734                break;
2735
2736        case CEPH_SESSION_FORCE_RO:
2737                dout("force_session_readonly %p\n", session);
2738                spin_lock(&session->s_cap_lock);
2739                session->s_readonly = true;
2740                spin_unlock(&session->s_cap_lock);
2741                wake_up_session_caps(session, 0);
2742                break;
2743
2744        case CEPH_SESSION_REJECT:
2745                WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
2746                pr_info("mds%d rejected session\n", session->s_mds);
2747                session->s_state = CEPH_MDS_SESSION_REJECTED;
2748                cleanup_session_requests(mdsc, session);
2749                remove_session_caps(session);
2750                wake = 2; /* for good measure */
2751                break;
2752
2753        default:
2754                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2755                WARN_ON(1);
2756        }
2757
2758        mutex_unlock(&session->s_mutex);
2759        if (wake) {
2760                mutex_lock(&mdsc->mutex);
2761                __wake_requests(mdsc, &session->s_waiting);
2762                if (wake == 2)
2763                        kick_requests(mdsc, mds);
2764                mutex_unlock(&mdsc->mutex);
2765        }
2766        if (op == CEPH_SESSION_CLOSE)
2767                ceph_put_mds_session(session);
2768        return;
2769
2770bad:
2771        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2772               (int)msg->front.iov_len);
2773        ceph_msg_dump(msg);
2774        return;
2775}
2776
2777
2778/*
2779 * called under session->mutex.
2780 */
2781static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2782                                   struct ceph_mds_session *session)
2783{
2784        struct ceph_mds_request *req, *nreq;
2785        struct rb_node *p;
2786        int err;
2787
2788        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2789
2790        mutex_lock(&mdsc->mutex);
2791        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2792                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2793                if (!err) {
2794                        ceph_msg_get(req->r_request);
2795                        ceph_con_send(&session->s_con, req->r_request);
2796                }
2797        }
2798
2799        /*
2800         * also re-send old requests when MDS enters reconnect stage. So that MDS
2801         * can process completed request in clientreplay stage.
2802         */
2803        p = rb_first(&mdsc->request_tree);
2804        while (p) {
2805                req = rb_entry(p, struct ceph_mds_request, r_node);
2806                p = rb_next(p);
2807                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2808                        continue;
2809                if (req->r_attempts == 0)
2810                        continue; /* only old requests */
2811                if (req->r_session &&
2812                    req->r_session->s_mds == session->s_mds) {
2813                        err = __prepare_send_request(mdsc, req,
2814                                                     session->s_mds, true);
2815                        if (!err) {
2816                                ceph_msg_get(req->r_request);
2817                                ceph_con_send(&session->s_con, req->r_request);
2818                        }
2819                }
2820        }
2821        mutex_unlock(&mdsc->mutex);
2822}
2823
2824/*
2825 * Encode information about a cap for a reconnect with the MDS.
2826 */
2827static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2828                          void *arg)
2829{
2830        union {
2831                struct ceph_mds_cap_reconnect v2;
2832                struct ceph_mds_cap_reconnect_v1 v1;
2833        } rec;
2834        struct ceph_inode_info *ci;
2835        struct ceph_reconnect_state *recon_state = arg;
2836        struct ceph_pagelist *pagelist = recon_state->pagelist;
2837        char *path;
2838        int pathlen, err;
2839        u64 pathbase;
2840        u64 snap_follows;
2841        struct dentry *dentry;
2842
2843        ci = cap->ci;
2844
2845        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2846             inode, ceph_vinop(inode), cap, cap->cap_id,
2847             ceph_cap_string(cap->issued));
2848        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2849        if (err)
2850                return err;
2851
2852        dentry = d_find_alias(inode);
2853        if (dentry) {
2854                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2855                if (IS_ERR(path)) {
2856                        err = PTR_ERR(path);
2857                        goto out_dput;
2858                }
2859        } else {
2860                path = NULL;
2861                pathlen = 0;
2862                pathbase = 0;
2863        }
2864
2865        spin_lock(&ci->i_ceph_lock);
2866        cap->seq = 0;        /* reset cap seq */
2867        cap->issue_seq = 0;  /* and issue_seq */
2868        cap->mseq = 0;       /* and migrate_seq */
2869        cap->cap_gen = cap->session->s_cap_gen;
2870
2871        if (recon_state->msg_version >= 2) {
2872                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2873                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2874                rec.v2.issued = cpu_to_le32(cap->issued);
2875                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2876                rec.v2.pathbase = cpu_to_le64(pathbase);
2877                rec.v2.flock_len = 0;
2878        } else {
2879                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2880                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2881                rec.v1.issued = cpu_to_le32(cap->issued);
2882                rec.v1.size = cpu_to_le64(inode->i_size);
2883                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2884                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2885                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2886                rec.v1.pathbase = cpu_to_le64(pathbase);
2887        }
2888
2889        if (list_empty(&ci->i_cap_snaps)) {
2890                snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
2891        } else {
2892                struct ceph_cap_snap *capsnap =
2893                        list_first_entry(&ci->i_cap_snaps,
2894                                         struct ceph_cap_snap, ci_item);
2895                snap_follows = capsnap->follows;
2896        }
2897        spin_unlock(&ci->i_ceph_lock);
2898
2899        if (recon_state->msg_version >= 2) {
2900                int num_fcntl_locks, num_flock_locks;
2901                struct ceph_filelock *flocks;
2902                size_t struct_len, total_len = 0;
2903                u8 struct_v = 0;
2904
2905encode_again:
2906                spin_lock(&inode->i_lock);
2907                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2908                spin_unlock(&inode->i_lock);
2909                flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2910                                 sizeof(struct ceph_filelock), GFP_NOFS);
2911                if (!flocks) {
2912                        err = -ENOMEM;
2913                        goto out_free;
2914                }
2915                spin_lock(&inode->i_lock);
2916                err = ceph_encode_locks_to_buffer(inode, flocks,
2917                                                  num_fcntl_locks,
2918                                                  num_flock_locks);
2919                spin_unlock(&inode->i_lock);
2920                if (err) {
2921                        kfree(flocks);
2922                        if (err == -ENOSPC)
2923                                goto encode_again;
2924                        goto out_free;
2925                }
2926
2927                if (recon_state->msg_version >= 3) {
2928                        /* version, compat_version and struct_len */
2929                        total_len = 2 * sizeof(u8) + sizeof(u32);
2930                        struct_v = 2;
2931                }
2932                /*
2933                 * number of encoded locks is stable, so copy to pagelist
2934                 */
2935                struct_len = 2 * sizeof(u32) +
2936                            (num_fcntl_locks + num_flock_locks) *
2937                            sizeof(struct ceph_filelock);
2938                rec.v2.flock_len = cpu_to_le32(struct_len);
2939
2940                struct_len += sizeof(rec.v2);
2941                struct_len += sizeof(u32) + pathlen;
2942
2943                if (struct_v >= 2)
2944                        struct_len += sizeof(u64); /* snap_follows */
2945
2946                total_len += struct_len;
2947                err = ceph_pagelist_reserve(pagelist, total_len);
2948
2949                if (!err) {
2950                        if (recon_state->msg_version >= 3) {
2951                                ceph_pagelist_encode_8(pagelist, struct_v);
2952                                ceph_pagelist_encode_8(pagelist, 1);
2953                                ceph_pagelist_encode_32(pagelist, struct_len);
2954                        }
2955                        ceph_pagelist_encode_string(pagelist, path, pathlen);
2956                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
2957                        ceph_locks_to_pagelist(flocks, pagelist,
2958                                               num_fcntl_locks,
2959                                               num_flock_locks);
2960                        if (struct_v >= 2)
2961                                ceph_pagelist_encode_64(pagelist, snap_follows);
2962                }
2963                kfree(flocks);
2964        } else {
2965                size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
2966                err = ceph_pagelist_reserve(pagelist, size);
2967                if (!err) {
2968                        ceph_pagelist_encode_string(pagelist, path, pathlen);
2969                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
2970                }
2971        }
2972
2973        recon_state->nr_caps++;
2974out_free:
2975        kfree(path);
2976out_dput:
2977        dput(dentry);
2978        return err;
2979}
2980
2981
2982/*
2983 * If an MDS fails and recovers, clients need to reconnect in order to
2984 * reestablish shared state.  This includes all caps issued through
2985 * this session _and_ the snap_realm hierarchy.  Because it's not
2986 * clear which snap realms the mds cares about, we send everything we
2987 * know about.. that ensures we'll then get any new info the
2988 * recovering MDS might have.
2989 *
2990 * This is a relatively heavyweight operation, but it's rare.
2991 *
2992 * called with mdsc->mutex held.
2993 */
2994static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2995                               struct ceph_mds_session *session)
2996{
2997        struct ceph_msg *reply;
2998        struct rb_node *p;
2999        int mds = session->s_mds;
3000        int err = -ENOMEM;
3001        int s_nr_caps;
3002        struct ceph_pagelist *pagelist;
3003        struct ceph_reconnect_state recon_state;
3004
3005        pr_info("mds%d reconnect start\n", mds);
3006
3007        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
3008        if (!pagelist)
3009                goto fail_nopagelist;
3010        ceph_pagelist_init(pagelist);
3011
3012        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
3013        if (!reply)
3014                goto fail_nomsg;
3015
3016        mutex_lock(&session->s_mutex);
3017        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3018        session->s_seq = 0;
3019
3020        dout("session %p state %s\n", session,
3021             ceph_session_state_name(session->s_state));
3022
3023        spin_lock(&session->s_gen_ttl_lock);
3024        session->s_cap_gen++;
3025        spin_unlock(&session->s_gen_ttl_lock);
3026
3027        spin_lock(&session->s_cap_lock);
3028        /* don't know if session is readonly */
3029        session->s_readonly = 0;
3030        /*
3031         * notify __ceph_remove_cap() that we are composing cap reconnect.
3032         * If a cap get released before being added to the cap reconnect,
3033         * __ceph_remove_cap() should skip queuing cap release.
3034         */
3035        session->s_cap_reconnect = 1;
3036        /* drop old cap expires; we're about to reestablish that state */
3037        cleanup_cap_releases(mdsc, session);
3038
3039        /* trim unused caps to reduce MDS's cache rejoin time */
3040        if (mdsc->fsc->sb->s_root)
3041                shrink_dcache_parent(mdsc->fsc->sb->s_root);
3042
3043        ceph_con_close(&session->s_con);
3044        ceph_con_open(&session->s_con,
3045                      CEPH_ENTITY_TYPE_MDS, mds,
3046                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3047
3048        /* replay unsafe requests */
3049        replay_unsafe_requests(mdsc, session);
3050
3051        down_read(&mdsc->snap_rwsem);
3052
3053        /* traverse this session's caps */
3054        s_nr_caps = session->s_nr_caps;
3055        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
3056        if (err)
3057                goto fail;
3058
3059        recon_state.nr_caps = 0;
3060        recon_state.pagelist = pagelist;
3061        if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
3062                recon_state.msg_version = 3;
3063        else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
3064                recon_state.msg_version = 2;
3065        else
3066                recon_state.msg_version = 1;
3067        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
3068        if (err < 0)
3069                goto fail;
3070
3071        spin_lock(&session->s_cap_lock);
3072        session->s_cap_reconnect = 0;
3073        spin_unlock(&session->s_cap_lock);
3074
3075        /*
3076         * snaprealms.  we provide mds with the ino, seq (version), and
3077         * parent for all of our realms.  If the mds has any newer info,
3078         * it will tell us.
3079         */
3080        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3081                struct ceph_snap_realm *realm =
3082                        rb_entry(p, struct ceph_snap_realm, node);
3083                struct ceph_mds_snaprealm_reconnect sr_rec;
3084
3085                dout(" adding snap realm %llx seq %lld parent %llx\n",
3086                     realm->ino, realm->seq, realm->parent_ino);
3087                sr_rec.ino = cpu_to_le64(realm->ino);
3088                sr_rec.seq = cpu_to_le64(realm->seq);
3089                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3090                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3091                if (err)
3092                        goto fail;
3093        }
3094
3095        reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3096
3097        /* raced with cap release? */
3098        if (s_nr_caps != recon_state.nr_caps) {
3099                struct page *page = list_first_entry(&pagelist->head,
3100                                                     struct page, lru);
3101                __le32 *addr = kmap_atomic(page);
3102                *addr = cpu_to_le32(recon_state.nr_caps);
3103                kunmap_atomic(addr);
3104        }
3105
3106        reply->hdr.data_len = cpu_to_le32(pagelist->length);
3107        ceph_msg_data_add_pagelist(reply, pagelist);
3108
3109        ceph_early_kick_flushing_caps(mdsc, session);
3110
3111        ceph_con_send(&session->s_con, reply);
3112
3113        mutex_unlock(&session->s_mutex);
3114
3115        mutex_lock(&mdsc->mutex);
3116        __wake_requests(mdsc, &session->s_waiting);
3117        mutex_unlock(&mdsc->mutex);
3118
3119        up_read(&mdsc->snap_rwsem);
3120        return;
3121
3122fail:
3123        ceph_msg_put(reply);
3124        up_read(&mdsc->snap_rwsem);
3125        mutex_unlock(&session->s_mutex);
3126fail_nomsg:
3127        ceph_pagelist_release(pagelist);
3128fail_nopagelist:
3129        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3130        return;
3131}
3132
3133
3134/*
3135 * compare old and new mdsmaps, kicking requests
3136 * and closing out old connections as necessary
3137 *
3138 * called under mdsc->mutex.
3139 */
3140static void check_new_map(struct ceph_mds_client *mdsc,
3141                          struct ceph_mdsmap *newmap,
3142                          struct ceph_mdsmap *oldmap)
3143{
3144        int i;
3145        int oldstate, newstate;
3146        struct ceph_mds_session *s;
3147
3148        dout("check_new_map new %u old %u\n",
3149             newmap->m_epoch, oldmap->m_epoch);
3150
3151        for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3152                if (!mdsc->sessions[i])
3153                        continue;
3154                s = mdsc->sessions[i];
3155                oldstate = ceph_mdsmap_get_state(oldmap, i);
3156                newstate = ceph_mdsmap_get_state(newmap, i);
3157
3158                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3159                     i, ceph_mds_state_name(oldstate),
3160                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3161                     ceph_mds_state_name(newstate),
3162                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3163                     ceph_session_state_name(s->s_state));
3164
3165                if (i >= newmap->m_num_mds ||
3166                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3167                           ceph_mdsmap_get_addr(newmap, i),
3168                           sizeof(struct ceph_entity_addr))) {
3169                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3170                                /* the session never opened, just close it
3171                                 * out now */
3172                                get_session(s);
3173                                __unregister_session(mdsc, s);
3174                                __wake_requests(mdsc, &s->s_waiting);
3175                                ceph_put_mds_session(s);
3176                        } else if (i >= newmap->m_num_mds) {
3177                                /* force close session for stopped mds */
3178                                get_session(s);
3179                                __unregister_session(mdsc, s);
3180                                __wake_requests(mdsc, &s->s_waiting);
3181                                kick_requests(mdsc, i);
3182                                mutex_unlock(&mdsc->mutex);
3183
3184                                mutex_lock(&s->s_mutex);
3185                                cleanup_session_requests(mdsc, s);
3186                                remove_session_caps(s);
3187                                mutex_unlock(&s->s_mutex);
3188
3189                                ceph_put_mds_session(s);
3190
3191                                mutex_lock(&mdsc->mutex);
3192                        } else {
3193                                /* just close it */
3194                                mutex_unlock(&mdsc->mutex);
3195                                mutex_lock(&s->s_mutex);
3196                                mutex_lock(&mdsc->mutex);
3197                                ceph_con_close(&s->s_con);
3198                                mutex_unlock(&s->s_mutex);
3199                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3200                        }
3201                } else if (oldstate == newstate) {
3202                        continue;  /* nothing new with this mds */
3203                }
3204
3205                /*
3206                 * send reconnect?
3207                 */
3208                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3209                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3210                        mutex_unlock(&mdsc->mutex);
3211                        send_mds_reconnect(mdsc, s);
3212                        mutex_lock(&mdsc->mutex);
3213                }
3214
3215                /*
3216                 * kick request on any mds that has gone active.
3217                 */
3218                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3219                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3220                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3221                            oldstate != CEPH_MDS_STATE_STARTING)
3222                                pr_info("mds%d recovery completed\n", s->s_mds);
3223                        kick_requests(mdsc, i);
3224                        ceph_kick_flushing_caps(mdsc, s);
3225                        wake_up_session_caps(s, 1);
3226                }
3227        }
3228
3229        for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3230                s = mdsc->sessions[i];
3231                if (!s)
3232                        continue;
3233                if (!ceph_mdsmap_is_laggy(newmap, i))
3234                        continue;
3235                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3236                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3237                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3238                        dout(" connecting to export targets of laggy mds%d\n",
3239                             i);
3240                        __open_export_target_sessions(mdsc, s);
3241                }
3242        }
3243}
3244
3245
3246
3247/*
3248 * leases
3249 */
3250
3251/*
3252 * caller must hold session s_mutex, dentry->d_lock
3253 */
3254void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3255{
3256        struct ceph_dentry_info *di = ceph_dentry(dentry);
3257
3258        ceph_put_mds_session(di->lease_session);
3259        di->lease_session = NULL;
3260}
3261
3262static void handle_lease(struct ceph_mds_client *mdsc,
3263                         struct ceph_mds_session *session,
3264                         struct ceph_msg *msg)
3265{
3266        struct super_block *sb = mdsc->fsc->sb;
3267        struct inode *inode;
3268        struct dentry *parent, *dentry;
3269        struct ceph_dentry_info *di;
3270        int mds = session->s_mds;
3271        struct ceph_mds_lease *h = msg->front.iov_base;
3272        u32 seq;
3273        struct ceph_vino vino;
3274        struct qstr dname;
3275        int release = 0;
3276
3277        dout("handle_lease from mds%d\n", mds);
3278
3279        /* decode */
3280        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3281                goto bad;
3282        vino.ino = le64_to_cpu(h->ino);
3283        vino.snap = CEPH_NOSNAP;
3284        seq = le32_to_cpu(h->seq);
3285        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3286        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3287        if (dname.len != get_unaligned_le32(h+1))
3288                goto bad;
3289
3290        /* lookup inode */
3291        inode = ceph_find_inode(sb, vino);
3292        dout("handle_lease %s, ino %llx %p %.*s\n",
3293             ceph_lease_op_name(h->action), vino.ino, inode,
3294             dname.len, dname.name);
3295
3296        mutex_lock(&session->s_mutex);
3297        session->s_seq++;
3298
3299        if (!inode) {
3300                dout("handle_lease no inode %llx\n", vino.ino);
3301                goto release;
3302        }
3303
3304        /* dentry */
3305        parent = d_find_alias(inode);
3306        if (!parent) {
3307                dout("no parent dentry on inode %p\n", inode);
3308                WARN_ON(1);
3309                goto release;  /* hrm... */
3310        }
3311        dname.hash = full_name_hash(dname.name, dname.len);
3312        dentry = d_lookup(parent, &dname);
3313        dput(parent);
3314        if (!dentry)
3315                goto release;
3316
3317        spin_lock(&dentry->d_lock);
3318        di = ceph_dentry(dentry);
3319        switch (h->action) {
3320        case CEPH_MDS_LEASE_REVOKE:
3321                if (di->lease_session == session) {
3322                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3323                                h->seq = cpu_to_le32(di->lease_seq);
3324                        __ceph_mdsc_drop_dentry_lease(dentry);
3325                }
3326                release = 1;
3327                break;
3328
3329        case CEPH_MDS_LEASE_RENEW:
3330                if (di->lease_session == session &&
3331                    di->lease_gen == session->s_cap_gen &&
3332                    di->lease_renew_from &&
3333                    di->lease_renew_after == 0) {
3334                        unsigned long duration =
3335                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3336
3337                        di->lease_seq = seq;
3338                        dentry->d_time = di->lease_renew_from + duration;
3339                        di->lease_renew_after = di->lease_renew_from +
3340                                (duration >> 1);
3341                        di->lease_renew_from = 0;
3342                }
3343                break;
3344        }
3345        spin_unlock(&dentry->d_lock);
3346        dput(dentry);
3347
3348        if (!release)
3349                goto out;
3350
3351release:
3352        /* let's just reuse the same message */
3353        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3354        ceph_msg_get(msg);
3355        ceph_con_send(&session->s_con, msg);
3356
3357out:
3358        iput(inode);
3359        mutex_unlock(&session->s_mutex);
3360        return;
3361
3362bad:
3363        pr_err("corrupt lease message\n");
3364        ceph_msg_dump(msg);
3365}
3366
3367void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3368                              struct inode *inode,
3369                              struct dentry *dentry, char action,
3370                              u32 seq)
3371{
3372        struct ceph_msg *msg;
3373        struct ceph_mds_lease *lease;
3374        int len = sizeof(*lease) + sizeof(u32);
3375        int dnamelen = 0;
3376
3377        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3378             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3379        dnamelen = dentry->d_name.len;
3380        len += dnamelen;
3381
3382        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3383        if (!msg)
3384                return;
3385        lease = msg->front.iov_base;
3386        lease->action = action;
3387        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3388        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3389        lease->seq = cpu_to_le32(seq);
3390        put_unaligned_le32(dnamelen, lease + 1);
3391        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3392
3393        /*
3394         * if this is a preemptive lease RELEASE, no need to
3395         * flush request stream, since the actual request will
3396         * soon follow.
3397         */
3398        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3399
3400        ceph_con_send(&session->s_con, msg);
3401}
3402
3403/*
3404 * drop all leases (and dentry refs) in preparation for umount
3405 */
3406static void drop_leases(struct ceph_mds_client *mdsc)
3407{
3408        int i;
3409
3410        dout("drop_leases\n");
3411        mutex_lock(&mdsc->mutex);
3412        for (i = 0; i < mdsc->max_sessions; i++) {
3413                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3414                if (!s)
3415                        continue;
3416                mutex_unlock(&mdsc->mutex);
3417                mutex_lock(&s->s_mutex);
3418                mutex_unlock(&s->s_mutex);
3419                ceph_put_mds_session(s);
3420                mutex_lock(&mdsc->mutex);
3421        }
3422        mutex_unlock(&mdsc->mutex);
3423}
3424
3425
3426
3427/*
3428 * delayed work -- periodically trim expired leases, renew caps with mds
3429 */
3430static void schedule_delayed(struct ceph_mds_client *mdsc)
3431{
3432        int delay = 5;
3433        unsigned hz = round_jiffies_relative(HZ * delay);
3434        schedule_delayed_work(&mdsc->delayed_work, hz);
3435}
3436
3437static void delayed_work(struct work_struct *work)
3438{
3439        int i;
3440        struct ceph_mds_client *mdsc =
3441                container_of(work, struct ceph_mds_client, delayed_work.work);
3442        int renew_interval;
3443        int renew_caps;
3444
3445        dout("mdsc delayed_work\n");
3446        ceph_check_delayed_caps(mdsc);
3447
3448        mutex_lock(&mdsc->mutex);
3449        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3450        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3451                                   mdsc->last_renew_caps);
3452        if (renew_caps)
3453                mdsc->last_renew_caps = jiffies;
3454
3455        for (i = 0; i < mdsc->max_sessions; i++) {
3456                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3457                if (!s)
3458                        continue;
3459                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3460                        dout("resending session close request for mds%d\n",
3461                             s->s_mds);
3462                        request_close_session(mdsc, s);
3463                        ceph_put_mds_session(s);
3464                        continue;
3465                }
3466                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3467                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3468                                s->s_state = CEPH_MDS_SESSION_HUNG;
3469                                pr_info("mds%d hung\n", s->s_mds);
3470                        }
3471                }
3472                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3473                        /* this mds is failed or recovering, just wait */
3474                        ceph_put_mds_session(s);
3475                        continue;
3476                }
3477                mutex_unlock(&mdsc->mutex);
3478
3479                mutex_lock(&s->s_mutex);
3480                if (renew_caps)
3481                        send_renew_caps(mdsc, s);
3482                else
3483                        ceph_con_keepalive(&s->s_con);
3484                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3485                    s->s_state == CEPH_MDS_SESSION_HUNG)
3486                        ceph_send_cap_releases(mdsc, s);
3487                mutex_unlock(&s->s_mutex);
3488                ceph_put_mds_session(s);
3489
3490                mutex_lock(&mdsc->mutex);
3491        }
3492        mutex_unlock(&mdsc->mutex);
3493
3494        schedule_delayed(mdsc);
3495}
3496
3497int ceph_mdsc_init(struct ceph_fs_client *fsc)
3498
3499{
3500        struct ceph_mds_client *mdsc;
3501
3502        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3503        if (!mdsc)
3504                return -ENOMEM;
3505        mdsc->fsc = fsc;
3506        fsc->mdsc = mdsc;
3507        mutex_init(&mdsc->mutex);
3508        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3509        if (!mdsc->mdsmap) {
3510                kfree(mdsc);
3511                return -ENOMEM;
3512        }
3513
3514        init_completion(&mdsc->safe_umount_waiters);
3515        init_waitqueue_head(&mdsc->session_close_wq);
3516        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3517        mdsc->sessions = NULL;
3518        atomic_set(&mdsc->num_sessions, 0);
3519        mdsc->max_sessions = 0;
3520        mdsc->stopping = 0;
3521        mdsc->last_snap_seq = 0;
3522        init_rwsem(&mdsc->snap_rwsem);
3523        mdsc->snap_realms = RB_ROOT;
3524        INIT_LIST_HEAD(&mdsc->snap_empty);
3525        spin_lock_init(&mdsc->snap_empty_lock);
3526        mdsc->last_tid = 0;
3527        mdsc->oldest_tid = 0;
3528        mdsc->request_tree = RB_ROOT;
3529        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3530        mdsc->last_renew_caps = jiffies;
3531        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3532        spin_lock_init(&mdsc->cap_delay_lock);
3533        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3534        spin_lock_init(&mdsc->snap_flush_lock);
3535        mdsc->last_cap_flush_tid = 1;
3536        INIT_LIST_HEAD(&mdsc->cap_flush_list);
3537        INIT_LIST_HEAD(&mdsc->cap_dirty);
3538        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3539        mdsc->num_cap_flushing = 0;
3540        spin_lock_init(&mdsc->cap_dirty_lock);
3541        init_waitqueue_head(&mdsc->cap_flushing_wq);
3542        spin_lock_init(&mdsc->dentry_lru_lock);
3543        INIT_LIST_HEAD(&mdsc->dentry_lru);
3544
3545        ceph_caps_init(mdsc);
3546        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3547
3548        init_rwsem(&mdsc->pool_perm_rwsem);
3549        mdsc->pool_perm_tree = RB_ROOT;
3550
3551        strncpy(mdsc->nodename, utsname()->nodename,
3552                sizeof(mdsc->nodename) - 1);
3553        return 0;
3554}
3555
3556/*
3557 * Wait for safe replies on open mds requests.  If we time out, drop
3558 * all requests from the tree to avoid dangling dentry refs.
3559 */
3560static void wait_requests(struct ceph_mds_client *mdsc)
3561{
3562        struct ceph_options *opts = mdsc->fsc->client->options;
3563        struct ceph_mds_request *req;
3564
3565        mutex_lock(&mdsc->mutex);
3566        if (__get_oldest_req(mdsc)) {
3567                mutex_unlock(&mdsc->mutex);
3568
3569                dout("wait_requests waiting for requests\n");
3570                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3571                                    ceph_timeout_jiffies(opts->mount_timeout));
3572
3573                /* tear down remaining requests */
3574                mutex_lock(&mdsc->mutex);
3575                while ((req = __get_oldest_req(mdsc))) {
3576                        dout("wait_requests timed out on tid %llu\n",
3577                             req->r_tid);
3578                        __unregister_request(mdsc, req);
3579                }
3580        }
3581        mutex_unlock(&mdsc->mutex);
3582        dout("wait_requests done\n");
3583}
3584
3585/*
3586 * called before mount is ro, and before dentries are torn down.
3587 * (hmm, does this still race with new lookups?)
3588 */
3589void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3590{
3591        dout("pre_umount\n");
3592        mdsc->stopping = 1;
3593
3594        drop_leases(mdsc);
3595        ceph_flush_dirty_caps(mdsc);
3596        wait_requests(mdsc);
3597
3598        /*
3599         * wait for reply handlers to drop their request refs and
3600         * their inode/dcache refs
3601         */
3602        ceph_msgr_flush();
3603}
3604
3605/*
3606 * wait for all write mds requests to flush.
3607 */
3608static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3609{
3610        struct ceph_mds_request *req = NULL, *nextreq;
3611        struct rb_node *n;
3612
3613        mutex_lock(&mdsc->mutex);
3614        dout("wait_unsafe_requests want %lld\n", want_tid);
3615restart:
3616        req = __get_oldest_req(mdsc);
3617        while (req && req->r_tid <= want_tid) {
3618                /* find next request */
3619                n = rb_next(&req->r_node);
3620                if (n)
3621                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3622                else
3623                        nextreq = NULL;
3624                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3625                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3626                        /* write op */
3627                        ceph_mdsc_get_request(req);
3628                        if (nextreq)
3629                                ceph_mdsc_get_request(nextreq);
3630                        mutex_unlock(&mdsc->mutex);
3631                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3632                             req->r_tid, want_tid);
3633                        wait_for_completion(&req->r_safe_completion);
3634                        mutex_lock(&mdsc->mutex);
3635                        ceph_mdsc_put_request(req);
3636                        if (!nextreq)
3637                                break;  /* next dne before, so we're done! */
3638                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3639                                /* next request was removed from tree */
3640                                ceph_mdsc_put_request(nextreq);
3641                                goto restart;
3642                        }
3643                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3644                }
3645                req = nextreq;
3646        }
3647        mutex_unlock(&mdsc->mutex);
3648        dout("wait_unsafe_requests done\n");
3649}
3650
3651void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3652{
3653        u64 want_tid, want_flush;
3654
3655        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3656                return;
3657
3658        dout("sync\n");
3659        mutex_lock(&mdsc->mutex);
3660        want_tid = mdsc->last_tid;
3661        mutex_unlock(&mdsc->mutex);
3662
3663        ceph_flush_dirty_caps(mdsc);
3664        spin_lock(&mdsc->cap_dirty_lock);
3665        want_flush = mdsc->last_cap_flush_tid;
3666        if (!list_empty(&mdsc->cap_flush_list)) {
3667                struct ceph_cap_flush *cf =
3668                        list_last_entry(&mdsc->cap_flush_list,
3669                                        struct ceph_cap_flush, g_list);
3670                cf->wake = true;
3671        }
3672        spin_unlock(&mdsc->cap_dirty_lock);
3673
3674        dout("sync want tid %lld flush_seq %lld\n",
3675             want_tid, want_flush);
3676
3677        wait_unsafe_requests(mdsc, want_tid);
3678        wait_caps_flush(mdsc, want_flush);
3679}
3680
3681/*
3682 * true if all sessions are closed, or we force unmount
3683 */
3684static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3685{
3686        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3687                return true;
3688        return atomic_read(&mdsc->num_sessions) <= skipped;
3689}
3690
3691/*
3692 * called after sb is ro.
3693 */
3694void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3695{
3696        struct ceph_options *opts = mdsc->fsc->client->options;
3697        struct ceph_mds_session *session;
3698        int i;
3699        int skipped = 0;
3700
3701        dout("close_sessions\n");
3702
3703        /* close sessions */
3704        mutex_lock(&mdsc->mutex);
3705        for (i = 0; i < mdsc->max_sessions; i++) {
3706                session = __ceph_lookup_mds_session(mdsc, i);
3707                if (!session)
3708                        continue;
3709                mutex_unlock(&mdsc->mutex);
3710                mutex_lock(&session->s_mutex);
3711                if (__close_session(mdsc, session) <= 0)
3712                        skipped++;
3713                mutex_unlock(&session->s_mutex);
3714                ceph_put_mds_session(session);
3715                mutex_lock(&mdsc->mutex);
3716        }
3717        mutex_unlock(&mdsc->mutex);
3718
3719        dout("waiting for sessions to close\n");
3720        wait_event_timeout(mdsc->session_close_wq,
3721                           done_closing_sessions(mdsc, skipped),
3722                           ceph_timeout_jiffies(opts->mount_timeout));
3723
3724        /* tear down remaining sessions */
3725        mutex_lock(&mdsc->mutex);
3726        for (i = 0; i < mdsc->max_sessions; i++) {
3727                if (mdsc->sessions[i]) {
3728                        session = get_session(mdsc->sessions[i]);
3729                        __unregister_session(mdsc, session);
3730                        mutex_unlock(&mdsc->mutex);
3731                        mutex_lock(&session->s_mutex);
3732                        remove_session_caps(session);
3733                        mutex_unlock(&session->s_mutex);
3734                        ceph_put_mds_session(session);
3735                        mutex_lock(&mdsc->mutex);
3736                }
3737        }
3738        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3739        mutex_unlock(&mdsc->mutex);
3740
3741        ceph_cleanup_empty_realms(mdsc);
3742
3743        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3744
3745        dout("stopped\n");
3746}
3747
3748void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3749{
3750        struct ceph_mds_session *session;
3751        int mds;
3752
3753        dout("force umount\n");
3754
3755        mutex_lock(&mdsc->mutex);
3756        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3757                session = __ceph_lookup_mds_session(mdsc, mds);
3758                if (!session)
3759                        continue;
3760                mutex_unlock(&mdsc->mutex);
3761                mutex_lock(&session->s_mutex);
3762                __close_session(mdsc, session);
3763                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3764                        cleanup_session_requests(mdsc, session);
3765                        remove_session_caps(session);
3766                }
3767                mutex_unlock(&session->s_mutex);
3768                ceph_put_mds_session(session);
3769                mutex_lock(&mdsc->mutex);
3770                kick_requests(mdsc, mds);
3771        }
3772        __wake_requests(mdsc, &mdsc->waiting_for_map);
3773        mutex_unlock(&mdsc->mutex);
3774}
3775
3776static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3777{
3778        dout("stop\n");
3779        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3780        if (mdsc->mdsmap)
3781                ceph_mdsmap_destroy(mdsc->mdsmap);
3782        kfree(mdsc->sessions);
3783        ceph_caps_finalize(mdsc);
3784        ceph_pool_perm_destroy(mdsc);
3785}
3786
3787void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3788{
3789        struct ceph_mds_client *mdsc = fsc->mdsc;
3790        dout("mdsc_destroy %p\n", mdsc);
3791
3792        /* flush out any connection work with references to us */
3793        ceph_msgr_flush();
3794
3795        ceph_mdsc_stop(mdsc);
3796
3797        fsc->mdsc = NULL;
3798        kfree(mdsc);
3799        dout("mdsc_destroy %p done\n", mdsc);
3800}
3801
3802void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3803{
3804        struct ceph_fs_client *fsc = mdsc->fsc;
3805        const char *mds_namespace = fsc->mount_options->mds_namespace;
3806        void *p = msg->front.iov_base;
3807        void *end = p + msg->front.iov_len;
3808        u32 epoch;
3809        u32 map_len;
3810        u32 num_fs;
3811        u32 mount_fscid = (u32)-1;
3812        u8 struct_v, struct_cv;
3813        int err = -EINVAL;
3814
3815        ceph_decode_need(&p, end, sizeof(u32), bad);
3816        epoch = ceph_decode_32(&p);
3817
3818        dout("handle_fsmap epoch %u\n", epoch);
3819
3820        ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3821        struct_v = ceph_decode_8(&p);
3822        struct_cv = ceph_decode_8(&p);
3823        map_len = ceph_decode_32(&p);
3824
3825        ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
3826        p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
3827
3828        num_fs = ceph_decode_32(&p);
3829        while (num_fs-- > 0) {
3830                void *info_p, *info_end;
3831                u32 info_len;
3832                u8 info_v, info_cv;
3833                u32 fscid, namelen;
3834
3835                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3836                info_v = ceph_decode_8(&p);
3837                info_cv = ceph_decode_8(&p);
3838                info_len = ceph_decode_32(&p);
3839                ceph_decode_need(&p, end, info_len, bad);
3840                info_p = p;
3841                info_end = p + info_len;
3842                p = info_end;
3843
3844                ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
3845                fscid = ceph_decode_32(&info_p);
3846                namelen = ceph_decode_32(&info_p);
3847                ceph_decode_need(&info_p, info_end, namelen, bad);
3848
3849                if (mds_namespace &&
3850                    strlen(mds_namespace) == namelen &&
3851                    !strncmp(mds_namespace, (char *)info_p, namelen)) {
3852                        mount_fscid = fscid;
3853                        break;
3854                }
3855        }
3856
3857        ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
3858        if (mount_fscid != (u32)-1) {
3859                fsc->client->monc.fs_cluster_id = mount_fscid;
3860                ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
3861                                   0, true);
3862                ceph_monc_renew_subs(&fsc->client->monc);
3863        } else {
3864                err = -ENOENT;
3865                goto err_out;
3866        }
3867        return;
3868bad:
3869        pr_err("error decoding fsmap\n");
3870err_out:
3871        mutex_lock(&mdsc->mutex);
3872        mdsc->mdsmap_err = -ENOENT;
3873        __wake_requests(mdsc, &mdsc->waiting_for_map);
3874        mutex_unlock(&mdsc->mutex);
3875        return;
3876}
3877
3878/*
3879 * handle mds map update.
3880 */
3881void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3882{
3883        u32 epoch;
3884        u32 maplen;
3885        void *p = msg->front.iov_base;
3886        void *end = p + msg->front.iov_len;
3887        struct ceph_mdsmap *newmap, *oldmap;
3888        struct ceph_fsid fsid;
3889        int err = -EINVAL;
3890
3891        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3892        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3893        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3894                return;
3895        epoch = ceph_decode_32(&p);
3896        maplen = ceph_decode_32(&p);
3897        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3898
3899        /* do we need it? */
3900        mutex_lock(&mdsc->mutex);
3901        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3902                dout("handle_map epoch %u <= our %u\n",
3903                     epoch, mdsc->mdsmap->m_epoch);
3904                mutex_unlock(&mdsc->mutex);
3905                return;
3906        }
3907
3908        newmap = ceph_mdsmap_decode(&p, end);
3909        if (IS_ERR(newmap)) {
3910                err = PTR_ERR(newmap);
3911                goto bad_unlock;
3912        }
3913
3914        /* swap into place */
3915        if (mdsc->mdsmap) {
3916                oldmap = mdsc->mdsmap;
3917                mdsc->mdsmap = newmap;
3918                check_new_map(mdsc, newmap, oldmap);
3919                ceph_mdsmap_destroy(oldmap);
3920        } else {
3921                mdsc->mdsmap = newmap;  /* first mds map */
3922        }
3923        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3924
3925        __wake_requests(mdsc, &mdsc->waiting_for_map);
3926        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3927                          mdsc->mdsmap->m_epoch);
3928
3929        mutex_unlock(&mdsc->mutex);
3930        schedule_delayed(mdsc);
3931        return;
3932
3933bad_unlock:
3934        mutex_unlock(&mdsc->mutex);
3935bad:
3936        pr_err("error decoding mdsmap %d\n", err);
3937        return;
3938}
3939
3940static struct ceph_connection *con_get(struct ceph_connection *con)
3941{
3942        struct ceph_mds_session *s = con->private;
3943
3944        if (get_session(s)) {
3945                dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3946                return con;
3947        }
3948        dout("mdsc con_get %p FAIL\n", s);
3949        return NULL;
3950}
3951
3952static void con_put(struct ceph_connection *con)
3953{
3954        struct ceph_mds_session *s = con->private;
3955
3956        dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3957        ceph_put_mds_session(s);
3958}
3959
3960/*
3961 * if the client is unresponsive for long enough, the mds will kill
3962 * the session entirely.
3963 */
3964static void peer_reset(struct ceph_connection *con)
3965{
3966        struct ceph_mds_session *s = con->private;
3967        struct ceph_mds_client *mdsc = s->s_mdsc;
3968
3969        pr_warn("mds%d closed our session\n", s->s_mds);
3970        send_mds_reconnect(mdsc, s);
3971}
3972
3973static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3974{
3975        struct ceph_mds_session *s = con->private;
3976        struct ceph_mds_client *mdsc = s->s_mdsc;
3977        int type = le16_to_cpu(msg->hdr.type);
3978
3979        mutex_lock(&mdsc->mutex);
3980        if (__verify_registered_session(mdsc, s) < 0) {
3981                mutex_unlock(&mdsc->mutex);
3982                goto out;
3983        }
3984        mutex_unlock(&mdsc->mutex);
3985
3986        switch (type) {
3987        case CEPH_MSG_MDS_MAP:
3988                ceph_mdsc_handle_mdsmap(mdsc, msg);
3989                break;
3990        case CEPH_MSG_FS_MAP_USER:
3991                ceph_mdsc_handle_fsmap(mdsc, msg);
3992                break;
3993        case CEPH_MSG_CLIENT_SESSION:
3994                handle_session(s, msg);
3995                break;
3996        case CEPH_MSG_CLIENT_REPLY:
3997                handle_reply(s, msg);
3998                break;
3999        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4000                handle_forward(mdsc, s, msg);
4001                break;
4002        case CEPH_MSG_CLIENT_CAPS:
4003                ceph_handle_caps(s, msg);
4004                break;
4005        case CEPH_MSG_CLIENT_SNAP:
4006                ceph_handle_snap(mdsc, s, msg);
4007                break;
4008        case CEPH_MSG_CLIENT_LEASE:
4009                handle_lease(mdsc, s, msg);
4010                break;
4011
4012        default:
4013                pr_err("received unknown message type %d %s\n", type,
4014                       ceph_msg_type_name(type));
4015        }
4016out:
4017        ceph_msg_put(msg);
4018}
4019
4020/*
4021 * authentication
4022 */
4023
4024/*
4025 * Note: returned pointer is the address of a structure that's
4026 * managed separately.  Caller must *not* attempt to free it.
4027 */
4028static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4029                                        int *proto, int force_new)
4030{
4031        struct ceph_mds_session *s = con->private;
4032        struct ceph_mds_client *mdsc = s->s_mdsc;
4033        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4034        struct ceph_auth_handshake *auth = &s->s_auth;
4035
4036        if (force_new && auth->authorizer) {
4037                ceph_auth_destroy_authorizer(auth->authorizer);
4038                auth->authorizer = NULL;
4039        }
4040        if (!auth->authorizer) {
4041                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4042                                                      auth);
4043                if (ret)
4044                        return ERR_PTR(ret);
4045        } else {
4046                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4047                                                      auth);
4048                if (ret)
4049                        return ERR_PTR(ret);
4050        }
4051        *proto = ac->protocol;
4052
4053        return auth;
4054}
4055
4056
4057static int verify_authorizer_reply(struct ceph_connection *con)
4058{
4059        struct ceph_mds_session *s = con->private;
4060        struct ceph_mds_client *mdsc = s->s_mdsc;
4061        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4062
4063        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4064}
4065
4066static int invalidate_authorizer(struct ceph_connection *con)
4067{
4068        struct ceph_mds_session *s = con->private;
4069        struct ceph_mds_client *mdsc = s->s_mdsc;
4070        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4071
4072        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4073
4074        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4075}
4076
4077static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4078                                struct ceph_msg_header *hdr, int *skip)
4079{
4080        struct ceph_msg *msg;
4081        int type = (int) le16_to_cpu(hdr->type);
4082        int front_len = (int) le32_to_cpu(hdr->front_len);
4083
4084        if (con->in_msg)
4085                return con->in_msg;
4086
4087        *skip = 0;
4088        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4089        if (!msg) {
4090                pr_err("unable to allocate msg type %d len %d\n",
4091                       type, front_len);
4092                return NULL;
4093        }
4094
4095        return msg;
4096}
4097
4098static int mds_sign_message(struct ceph_msg *msg)
4099{
4100       struct ceph_mds_session *s = msg->con->private;
4101       struct ceph_auth_handshake *auth = &s->s_auth;
4102
4103       return ceph_auth_sign_message(auth, msg);
4104}
4105
4106static int mds_check_message_signature(struct ceph_msg *msg)
4107{
4108       struct ceph_mds_session *s = msg->con->private;
4109       struct ceph_auth_handshake *auth = &s->s_auth;
4110
4111       return ceph_auth_check_message_signature(auth, msg);
4112}
4113
4114static const struct ceph_connection_operations mds_con_ops = {
4115        .get = con_get,
4116        .put = con_put,
4117        .dispatch = dispatch,
4118        .get_authorizer = get_authorizer,
4119        .verify_authorizer_reply = verify_authorizer_reply,
4120        .invalidate_authorizer = invalidate_authorizer,
4121        .peer_reset = peer_reset,
4122        .alloc_msg = mds_alloc_msg,
4123        .sign_message = mds_sign_message,
4124        .check_message_signature = mds_check_message_signature,
4125};
4126
4127/* eof */
4128