linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/fs.h>
   5#include <linux/wait.h>
   6#include <linux/slab.h>
   7#include <linux/gfp.h>
   8#include <linux/sched.h>
   9#include <linux/debugfs.h>
  10#include <linux/seq_file.h>
  11#include <linux/ratelimit.h>
  12
  13#include "super.h"
  14#include "mds_client.h"
  15
  16#include <linux/ceph/ceph_features.h>
  17#include <linux/ceph/messenger.h>
  18#include <linux/ceph/decode.h>
  19#include <linux/ceph/pagelist.h>
  20#include <linux/ceph/auth.h>
  21#include <linux/ceph/debugfs.h>
  22
  23/*
  24 * A cluster of MDS (metadata server) daemons is responsible for
  25 * managing the file system namespace (the directory hierarchy and
  26 * inodes) and for coordinating shared access to storage.  Metadata is
  27 * partitioning hierarchically across a number of servers, and that
  28 * partition varies over time as the cluster adjusts the distribution
  29 * in order to balance load.
  30 *
  31 * The MDS client is primarily responsible to managing synchronous
  32 * metadata requests for operations like open, unlink, and so forth.
  33 * If there is a MDS failure, we find out about it when we (possibly
  34 * request and) receive a new MDS map, and can resubmit affected
  35 * requests.
  36 *
  37 * For the most part, though, we take advantage of a lossless
  38 * communications channel to the MDS, and do not need to worry about
  39 * timing out or resubmitting requests.
  40 *
  41 * We maintain a stateful "session" with each MDS we interact with.
  42 * Within each session, we sent periodic heartbeat messages to ensure
  43 * any capabilities or leases we have been issues remain valid.  If
  44 * the session times out and goes stale, our leases and capabilities
  45 * are no longer valid.
  46 */
  47
  48struct ceph_reconnect_state {
  49        int nr_caps;
  50        struct ceph_pagelist *pagelist;
  51        unsigned msg_version;
  52};
  53
  54static void __wake_requests(struct ceph_mds_client *mdsc,
  55                            struct list_head *head);
  56
  57static const struct ceph_connection_operations mds_con_ops;
  58
  59
  60/*
  61 * mds reply parsing
  62 */
  63
  64/*
  65 * parse individual inode info
  66 */
  67static int parse_reply_info_in(void **p, void *end,
  68                               struct ceph_mds_reply_info_in *info,
  69                               u64 features)
  70{
  71        int err = -EIO;
  72
  73        info->in = *p;
  74        *p += sizeof(struct ceph_mds_reply_inode) +
  75                sizeof(*info->in->fragtree.splits) *
  76                le32_to_cpu(info->in->fragtree.nsplits);
  77
  78        ceph_decode_32_safe(p, end, info->symlink_len, bad);
  79        ceph_decode_need(p, end, info->symlink_len, bad);
  80        info->symlink = *p;
  81        *p += info->symlink_len;
  82
  83        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  84                ceph_decode_copy_safe(p, end, &info->dir_layout,
  85                                      sizeof(info->dir_layout), bad);
  86        else
  87                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  88
  89        ceph_decode_32_safe(p, end, info->xattr_len, bad);
  90        ceph_decode_need(p, end, info->xattr_len, bad);
  91        info->xattr_data = *p;
  92        *p += info->xattr_len;
  93
  94        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
  95                ceph_decode_64_safe(p, end, info->inline_version, bad);
  96                ceph_decode_32_safe(p, end, info->inline_len, bad);
  97                ceph_decode_need(p, end, info->inline_len, bad);
  98                info->inline_data = *p;
  99                *p += info->inline_len;
 100        } else
 101                info->inline_version = CEPH_INLINE_NONE;
 102
 103        if (features & CEPH_FEATURE_MDS_QUOTA) {
 104                u8 struct_v, struct_compat;
 105                u32 struct_len;
 106
 107                /*
 108                 * both struct_v and struct_compat are expected to be >= 1
 109                 */
 110                ceph_decode_8_safe(p, end, struct_v, bad);
 111                ceph_decode_8_safe(p, end, struct_compat, bad);
 112                if (!struct_v || !struct_compat)
 113                        goto bad;
 114                ceph_decode_32_safe(p, end, struct_len, bad);
 115                ceph_decode_need(p, end, struct_len, bad);
 116                ceph_decode_64_safe(p, end, info->max_bytes, bad);
 117                ceph_decode_64_safe(p, end, info->max_files, bad);
 118        } else {
 119                info->max_bytes = 0;
 120                info->max_files = 0;
 121        }
 122
 123        info->pool_ns_len = 0;
 124        info->pool_ns_data = NULL;
 125        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 126                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 127                if (info->pool_ns_len > 0) {
 128                        ceph_decode_need(p, end, info->pool_ns_len, bad);
 129                        info->pool_ns_data = *p;
 130                        *p += info->pool_ns_len;
 131                }
 132        }
 133
 134        return 0;
 135bad:
 136        return err;
 137}
 138
 139/*
 140 * parse a normal reply, which may contain a (dir+)dentry and/or a
 141 * target inode.
 142 */
 143static int parse_reply_info_trace(void **p, void *end,
 144                                  struct ceph_mds_reply_info_parsed *info,
 145                                  u64 features)
 146{
 147        int err;
 148
 149        if (info->head->is_dentry) {
 150                err = parse_reply_info_in(p, end, &info->diri, features);
 151                if (err < 0)
 152                        goto out_bad;
 153
 154                if (unlikely(*p + sizeof(*info->dirfrag) > end))
 155                        goto bad;
 156                info->dirfrag = *p;
 157                *p += sizeof(*info->dirfrag) +
 158                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 159                if (unlikely(*p > end))
 160                        goto bad;
 161
 162                ceph_decode_32_safe(p, end, info->dname_len, bad);
 163                ceph_decode_need(p, end, info->dname_len, bad);
 164                info->dname = *p;
 165                *p += info->dname_len;
 166                info->dlease = *p;
 167                *p += sizeof(*info->dlease);
 168        }
 169
 170        if (info->head->is_target) {
 171                err = parse_reply_info_in(p, end, &info->targeti, features);
 172                if (err < 0)
 173                        goto out_bad;
 174        }
 175
 176        if (unlikely(*p != end))
 177                goto bad;
 178        return 0;
 179
 180bad:
 181        err = -EIO;
 182out_bad:
 183        pr_err("problem parsing mds trace %d\n", err);
 184        return err;
 185}
 186
 187/*
 188 * parse readdir results
 189 */
 190static int parse_reply_info_dir(void **p, void *end,
 191                                struct ceph_mds_reply_info_parsed *info,
 192                                u64 features)
 193{
 194        u32 num, i = 0;
 195        int err;
 196
 197        info->dir_dir = *p;
 198        if (*p + sizeof(*info->dir_dir) > end)
 199                goto bad;
 200        *p += sizeof(*info->dir_dir) +
 201                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 202        if (*p > end)
 203                goto bad;
 204
 205        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 206        num = ceph_decode_32(p);
 207        {
 208                u16 flags = ceph_decode_16(p);
 209                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 210                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 211                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
 212                info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 213        }
 214        if (num == 0)
 215                goto done;
 216
 217        BUG_ON(!info->dir_entries);
 218        if ((unsigned long)(info->dir_entries + num) >
 219            (unsigned long)info->dir_entries + info->dir_buf_size) {
 220                pr_err("dir contents are larger than expected\n");
 221                WARN_ON(1);
 222                goto bad;
 223        }
 224
 225        info->dir_nr = num;
 226        while (num) {
 227                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 228                /* dentry */
 229                ceph_decode_need(p, end, sizeof(u32)*2, bad);
 230                rde->name_len = ceph_decode_32(p);
 231                ceph_decode_need(p, end, rde->name_len, bad);
 232                rde->name = *p;
 233                *p += rde->name_len;
 234                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
 235                rde->lease = *p;
 236                *p += sizeof(struct ceph_mds_reply_lease);
 237
 238                /* inode */
 239                err = parse_reply_info_in(p, end, &rde->inode, features);
 240                if (err < 0)
 241                        goto out_bad;
 242                /* ceph_readdir_prepopulate() will update it */
 243                rde->offset = 0;
 244                i++;
 245                num--;
 246        }
 247
 248done:
 249        if (*p != end)
 250                goto bad;
 251        return 0;
 252
 253bad:
 254        err = -EIO;
 255out_bad:
 256        pr_err("problem parsing dir contents %d\n", err);
 257        return err;
 258}
 259
 260/*
 261 * parse fcntl F_GETLK results
 262 */
 263static int parse_reply_info_filelock(void **p, void *end,
 264                                     struct ceph_mds_reply_info_parsed *info,
 265                                     u64 features)
 266{
 267        if (*p + sizeof(*info->filelock_reply) > end)
 268                goto bad;
 269
 270        info->filelock_reply = *p;
 271        *p += sizeof(*info->filelock_reply);
 272
 273        if (unlikely(*p != end))
 274                goto bad;
 275        return 0;
 276
 277bad:
 278        return -EIO;
 279}
 280
 281/*
 282 * parse create results
 283 */
 284static int parse_reply_info_create(void **p, void *end,
 285                                  struct ceph_mds_reply_info_parsed *info,
 286                                  u64 features)
 287{
 288        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
 289                if (*p == end) {
 290                        info->has_create_ino = false;
 291                } else {
 292                        info->has_create_ino = true;
 293                        info->ino = ceph_decode_64(p);
 294                }
 295        }
 296
 297        if (unlikely(*p != end))
 298                goto bad;
 299        return 0;
 300
 301bad:
 302        return -EIO;
 303}
 304
 305/*
 306 * parse extra results
 307 */
 308static int parse_reply_info_extra(void **p, void *end,
 309                                  struct ceph_mds_reply_info_parsed *info,
 310                                  u64 features)
 311{
 312        u32 op = le32_to_cpu(info->head->op);
 313
 314        if (op == CEPH_MDS_OP_GETFILELOCK)
 315                return parse_reply_info_filelock(p, end, info, features);
 316        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
 317                return parse_reply_info_dir(p, end, info, features);
 318        else if (op == CEPH_MDS_OP_CREATE)
 319                return parse_reply_info_create(p, end, info, features);
 320        else
 321                return -EIO;
 322}
 323
 324/*
 325 * parse entire mds reply
 326 */
 327static int parse_reply_info(struct ceph_msg *msg,
 328                            struct ceph_mds_reply_info_parsed *info,
 329                            u64 features)
 330{
 331        void *p, *end;
 332        u32 len;
 333        int err;
 334
 335        info->head = msg->front.iov_base;
 336        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 337        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 338
 339        /* trace */
 340        ceph_decode_32_safe(&p, end, len, bad);
 341        if (len > 0) {
 342                ceph_decode_need(&p, end, len, bad);
 343                err = parse_reply_info_trace(&p, p+len, info, features);
 344                if (err < 0)
 345                        goto out_bad;
 346        }
 347
 348        /* extra */
 349        ceph_decode_32_safe(&p, end, len, bad);
 350        if (len > 0) {
 351                ceph_decode_need(&p, end, len, bad);
 352                err = parse_reply_info_extra(&p, p+len, info, features);
 353                if (err < 0)
 354                        goto out_bad;
 355        }
 356
 357        /* snap blob */
 358        ceph_decode_32_safe(&p, end, len, bad);
 359        info->snapblob_len = len;
 360        info->snapblob = p;
 361        p += len;
 362
 363        if (p != end)
 364                goto bad;
 365        return 0;
 366
 367bad:
 368        err = -EIO;
 369out_bad:
 370        pr_err("mds parse_reply err %d\n", err);
 371        return err;
 372}
 373
 374static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 375{
 376        if (!info->dir_entries)
 377                return;
 378        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 379}
 380
 381
 382/*
 383 * sessions
 384 */
 385const char *ceph_session_state_name(int s)
 386{
 387        switch (s) {
 388        case CEPH_MDS_SESSION_NEW: return "new";
 389        case CEPH_MDS_SESSION_OPENING: return "opening";
 390        case CEPH_MDS_SESSION_OPEN: return "open";
 391        case CEPH_MDS_SESSION_HUNG: return "hung";
 392        case CEPH_MDS_SESSION_CLOSING: return "closing";
 393        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 394        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 395        case CEPH_MDS_SESSION_REJECTED: return "rejected";
 396        default: return "???";
 397        }
 398}
 399
 400static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 401{
 402        if (refcount_inc_not_zero(&s->s_ref)) {
 403                dout("mdsc get_session %p %d -> %d\n", s,
 404                     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
 405                return s;
 406        } else {
 407                dout("mdsc get_session %p 0 -- FAIL\n", s);
 408                return NULL;
 409        }
 410}
 411
 412void ceph_put_mds_session(struct ceph_mds_session *s)
 413{
 414        dout("mdsc put_session %p %d -> %d\n", s,
 415             refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
 416        if (refcount_dec_and_test(&s->s_ref)) {
 417                if (s->s_auth.authorizer)
 418                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 419                kfree(s);
 420        }
 421}
 422
 423/*
 424 * called under mdsc->mutex
 425 */
 426struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 427                                                   int mds)
 428{
 429        struct ceph_mds_session *session;
 430
 431        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 432                return NULL;
 433        session = mdsc->sessions[mds];
 434        dout("lookup_mds_session %p %d\n", session,
 435             refcount_read(&session->s_ref));
 436        get_session(session);
 437        return session;
 438}
 439
 440static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 441{
 442        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 443                return false;
 444        else
 445                return true;
 446}
 447
 448static int __verify_registered_session(struct ceph_mds_client *mdsc,
 449                                       struct ceph_mds_session *s)
 450{
 451        if (s->s_mds >= mdsc->max_sessions ||
 452            mdsc->sessions[s->s_mds] != s)
 453                return -ENOENT;
 454        return 0;
 455}
 456
 457/*
 458 * create+register a new session for given mds.
 459 * called under mdsc->mutex.
 460 */
 461static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 462                                                 int mds)
 463{
 464        struct ceph_mds_session *s;
 465
 466        if (mds >= mdsc->mdsmap->m_num_mds)
 467                return ERR_PTR(-EINVAL);
 468
 469        s = kzalloc(sizeof(*s), GFP_NOFS);
 470        if (!s)
 471                return ERR_PTR(-ENOMEM);
 472
 473        if (mds >= mdsc->max_sessions) {
 474                int newmax = 1 << get_count_order(mds + 1);
 475                struct ceph_mds_session **sa;
 476
 477                dout("%s: realloc to %d\n", __func__, newmax);
 478                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 479                if (!sa)
 480                        goto fail_realloc;
 481                if (mdsc->sessions) {
 482                        memcpy(sa, mdsc->sessions,
 483                               mdsc->max_sessions * sizeof(void *));
 484                        kfree(mdsc->sessions);
 485                }
 486                mdsc->sessions = sa;
 487                mdsc->max_sessions = newmax;
 488        }
 489
 490        dout("%s: mds%d\n", __func__, mds);
 491        s->s_mdsc = mdsc;
 492        s->s_mds = mds;
 493        s->s_state = CEPH_MDS_SESSION_NEW;
 494        s->s_ttl = 0;
 495        s->s_seq = 0;
 496        mutex_init(&s->s_mutex);
 497
 498        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 499
 500        spin_lock_init(&s->s_gen_ttl_lock);
 501        s->s_cap_gen = 0;
 502        s->s_cap_ttl = jiffies - 1;
 503
 504        spin_lock_init(&s->s_cap_lock);
 505        s->s_renew_requested = 0;
 506        s->s_renew_seq = 0;
 507        INIT_LIST_HEAD(&s->s_caps);
 508        s->s_nr_caps = 0;
 509        s->s_trim_caps = 0;
 510        refcount_set(&s->s_ref, 1);
 511        INIT_LIST_HEAD(&s->s_waiting);
 512        INIT_LIST_HEAD(&s->s_unsafe);
 513        s->s_num_cap_releases = 0;
 514        s->s_cap_reconnect = 0;
 515        s->s_cap_iterator = NULL;
 516        INIT_LIST_HEAD(&s->s_cap_releases);
 517        INIT_LIST_HEAD(&s->s_cap_flushing);
 518
 519        mdsc->sessions[mds] = s;
 520        atomic_inc(&mdsc->num_sessions);
 521        refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 522
 523        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 524                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 525
 526        return s;
 527
 528fail_realloc:
 529        kfree(s);
 530        return ERR_PTR(-ENOMEM);
 531}
 532
 533/*
 534 * called under mdsc->mutex
 535 */
 536static void __unregister_session(struct ceph_mds_client *mdsc,
 537                               struct ceph_mds_session *s)
 538{
 539        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 540        BUG_ON(mdsc->sessions[s->s_mds] != s);
 541        mdsc->sessions[s->s_mds] = NULL;
 542        ceph_con_close(&s->s_con);
 543        ceph_put_mds_session(s);
 544        atomic_dec(&mdsc->num_sessions);
 545}
 546
 547/*
 548 * drop session refs in request.
 549 *
 550 * should be last request ref, or hold mdsc->mutex
 551 */
 552static void put_request_session(struct ceph_mds_request *req)
 553{
 554        if (req->r_session) {
 555                ceph_put_mds_session(req->r_session);
 556                req->r_session = NULL;
 557        }
 558}
 559
 560void ceph_mdsc_release_request(struct kref *kref)
 561{
 562        struct ceph_mds_request *req = container_of(kref,
 563                                                    struct ceph_mds_request,
 564                                                    r_kref);
 565        destroy_reply_info(&req->r_reply_info);
 566        if (req->r_request)
 567                ceph_msg_put(req->r_request);
 568        if (req->r_reply)
 569                ceph_msg_put(req->r_reply);
 570        if (req->r_inode) {
 571                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 572                iput(req->r_inode);
 573        }
 574        if (req->r_parent)
 575                ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
 576        iput(req->r_target_inode);
 577        if (req->r_dentry)
 578                dput(req->r_dentry);
 579        if (req->r_old_dentry)
 580                dput(req->r_old_dentry);
 581        if (req->r_old_dentry_dir) {
 582                /*
 583                 * track (and drop pins for) r_old_dentry_dir
 584                 * separately, since r_old_dentry's d_parent may have
 585                 * changed between the dir mutex being dropped and
 586                 * this request being freed.
 587                 */
 588                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 589                                  CEPH_CAP_PIN);
 590                iput(req->r_old_dentry_dir);
 591        }
 592        kfree(req->r_path1);
 593        kfree(req->r_path2);
 594        if (req->r_pagelist)
 595                ceph_pagelist_release(req->r_pagelist);
 596        put_request_session(req);
 597        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 598        kfree(req);
 599}
 600
 601DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
 602
 603/*
 604 * lookup session, bump ref if found.
 605 *
 606 * called under mdsc->mutex.
 607 */
 608static struct ceph_mds_request *
 609lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
 610{
 611        struct ceph_mds_request *req;
 612
 613        req = lookup_request(&mdsc->request_tree, tid);
 614        if (req)
 615                ceph_mdsc_get_request(req);
 616
 617        return req;
 618}
 619
 620/*
 621 * Register an in-flight request, and assign a tid.  Link to directory
 622 * are modifying (if any).
 623 *
 624 * Called under mdsc->mutex.
 625 */
 626static void __register_request(struct ceph_mds_client *mdsc,
 627                               struct ceph_mds_request *req,
 628                               struct inode *dir)
 629{
 630        int ret = 0;
 631
 632        req->r_tid = ++mdsc->last_tid;
 633        if (req->r_num_caps) {
 634                ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 635                                        req->r_num_caps);
 636                if (ret < 0) {
 637                        pr_err("__register_request %p "
 638                               "failed to reserve caps: %d\n", req, ret);
 639                        /* set req->r_err to fail early from __do_request */
 640                        req->r_err = ret;
 641                        return;
 642                }
 643        }
 644        dout("__register_request %p tid %lld\n", req, req->r_tid);
 645        ceph_mdsc_get_request(req);
 646        insert_request(&mdsc->request_tree, req);
 647
 648        req->r_uid = current_fsuid();
 649        req->r_gid = current_fsgid();
 650
 651        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 652                mdsc->oldest_tid = req->r_tid;
 653
 654        if (dir) {
 655                ihold(dir);
 656                req->r_unsafe_dir = dir;
 657        }
 658}
 659
 660static void __unregister_request(struct ceph_mds_client *mdsc,
 661                                 struct ceph_mds_request *req)
 662{
 663        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 664
 665        /* Never leave an unregistered request on an unsafe list! */
 666        list_del_init(&req->r_unsafe_item);
 667
 668        if (req->r_tid == mdsc->oldest_tid) {
 669                struct rb_node *p = rb_next(&req->r_node);
 670                mdsc->oldest_tid = 0;
 671                while (p) {
 672                        struct ceph_mds_request *next_req =
 673                                rb_entry(p, struct ceph_mds_request, r_node);
 674                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 675                                mdsc->oldest_tid = next_req->r_tid;
 676                                break;
 677                        }
 678                        p = rb_next(p);
 679                }
 680        }
 681
 682        erase_request(&mdsc->request_tree, req);
 683
 684        if (req->r_unsafe_dir  &&
 685            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 686                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 687                spin_lock(&ci->i_unsafe_lock);
 688                list_del_init(&req->r_unsafe_dir_item);
 689                spin_unlock(&ci->i_unsafe_lock);
 690        }
 691        if (req->r_target_inode &&
 692            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 693                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 694                spin_lock(&ci->i_unsafe_lock);
 695                list_del_init(&req->r_unsafe_target_item);
 696                spin_unlock(&ci->i_unsafe_lock);
 697        }
 698
 699        if (req->r_unsafe_dir) {
 700                iput(req->r_unsafe_dir);
 701                req->r_unsafe_dir = NULL;
 702        }
 703
 704        complete_all(&req->r_safe_completion);
 705
 706        ceph_mdsc_put_request(req);
 707}
 708
 709/*
 710 * Walk back up the dentry tree until we hit a dentry representing a
 711 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
 712 * when calling this) to ensure that the objects won't disappear while we're
 713 * working with them. Once we hit a candidate dentry, we attempt to take a
 714 * reference to it, and return that as the result.
 715 */
 716static struct inode *get_nonsnap_parent(struct dentry *dentry)
 717{
 718        struct inode *inode = NULL;
 719
 720        while (dentry && !IS_ROOT(dentry)) {
 721                inode = d_inode_rcu(dentry);
 722                if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
 723                        break;
 724                dentry = dentry->d_parent;
 725        }
 726        if (inode)
 727                inode = igrab(inode);
 728        return inode;
 729}
 730
 731/*
 732 * Choose mds to send request to next.  If there is a hint set in the
 733 * request (e.g., due to a prior forward hint from the mds), use that.
 734 * Otherwise, consult frag tree and/or caps to identify the
 735 * appropriate mds.  If all else fails, choose randomly.
 736 *
 737 * Called under mdsc->mutex.
 738 */
 739static int __choose_mds(struct ceph_mds_client *mdsc,
 740                        struct ceph_mds_request *req)
 741{
 742        struct inode *inode;
 743        struct ceph_inode_info *ci;
 744        struct ceph_cap *cap;
 745        int mode = req->r_direct_mode;
 746        int mds = -1;
 747        u32 hash = req->r_direct_hash;
 748        bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
 749
 750        /*
 751         * is there a specific mds we should try?  ignore hint if we have
 752         * no session and the mds is not up (active or recovering).
 753         */
 754        if (req->r_resend_mds >= 0 &&
 755            (__have_session(mdsc, req->r_resend_mds) ||
 756             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 757                dout("choose_mds using resend_mds mds%d\n",
 758                     req->r_resend_mds);
 759                return req->r_resend_mds;
 760        }
 761
 762        if (mode == USE_RANDOM_MDS)
 763                goto random;
 764
 765        inode = NULL;
 766        if (req->r_inode) {
 767                if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
 768                        inode = req->r_inode;
 769                        ihold(inode);
 770                } else {
 771                        /* req->r_dentry is non-null for LSSNAP request */
 772                        rcu_read_lock();
 773                        inode = get_nonsnap_parent(req->r_dentry);
 774                        rcu_read_unlock();
 775                        dout("__choose_mds using snapdir's parent %p\n", inode);
 776                }
 777        } else if (req->r_dentry) {
 778                /* ignore race with rename; old or new d_parent is okay */
 779                struct dentry *parent;
 780                struct inode *dir;
 781
 782                rcu_read_lock();
 783                parent = req->r_dentry->d_parent;
 784                dir = req->r_parent ? : d_inode_rcu(parent);
 785
 786                if (!dir || dir->i_sb != mdsc->fsc->sb) {
 787                        /*  not this fs or parent went negative */
 788                        inode = d_inode(req->r_dentry);
 789                        if (inode)
 790                                ihold(inode);
 791                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
 792                        /* direct snapped/virtual snapdir requests
 793                         * based on parent dir inode */
 794                        inode = get_nonsnap_parent(parent);
 795                        dout("__choose_mds using nonsnap parent %p\n", inode);
 796                } else {
 797                        /* dentry target */
 798                        inode = d_inode(req->r_dentry);
 799                        if (!inode || mode == USE_AUTH_MDS) {
 800                                /* dir + name */
 801                                inode = igrab(dir);
 802                                hash = ceph_dentry_hash(dir, req->r_dentry);
 803                                is_hash = true;
 804                        } else {
 805                                ihold(inode);
 806                        }
 807                }
 808                rcu_read_unlock();
 809        }
 810
 811        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 812             (int)hash, mode);
 813        if (!inode)
 814                goto random;
 815        ci = ceph_inode(inode);
 816
 817        if (is_hash && S_ISDIR(inode->i_mode)) {
 818                struct ceph_inode_frag frag;
 819                int found;
 820
 821                ceph_choose_frag(ci, hash, &frag, &found);
 822                if (found) {
 823                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
 824                                u8 r;
 825
 826                                /* choose a random replica */
 827                                get_random_bytes(&r, 1);
 828                                r %= frag.ndist;
 829                                mds = frag.dist[r];
 830                                dout("choose_mds %p %llx.%llx "
 831                                     "frag %u mds%d (%d/%d)\n",
 832                                     inode, ceph_vinop(inode),
 833                                     frag.frag, mds,
 834                                     (int)r, frag.ndist);
 835                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 836                                    CEPH_MDS_STATE_ACTIVE)
 837                                        goto out;
 838                        }
 839
 840                        /* since this file/dir wasn't known to be
 841                         * replicated, then we want to look for the
 842                         * authoritative mds. */
 843                        mode = USE_AUTH_MDS;
 844                        if (frag.mds >= 0) {
 845                                /* choose auth mds */
 846                                mds = frag.mds;
 847                                dout("choose_mds %p %llx.%llx "
 848                                     "frag %u mds%d (auth)\n",
 849                                     inode, ceph_vinop(inode), frag.frag, mds);
 850                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 851                                    CEPH_MDS_STATE_ACTIVE)
 852                                        goto out;
 853                        }
 854                }
 855        }
 856
 857        spin_lock(&ci->i_ceph_lock);
 858        cap = NULL;
 859        if (mode == USE_AUTH_MDS)
 860                cap = ci->i_auth_cap;
 861        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 862                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 863        if (!cap) {
 864                spin_unlock(&ci->i_ceph_lock);
 865                iput(inode);
 866                goto random;
 867        }
 868        mds = cap->session->s_mds;
 869        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 870             inode, ceph_vinop(inode), mds,
 871             cap == ci->i_auth_cap ? "auth " : "", cap);
 872        spin_unlock(&ci->i_ceph_lock);
 873out:
 874        iput(inode);
 875        return mds;
 876
 877random:
 878        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 879        dout("choose_mds chose random mds%d\n", mds);
 880        return mds;
 881}
 882
 883
 884/*
 885 * session messages
 886 */
 887static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 888{
 889        struct ceph_msg *msg;
 890        struct ceph_mds_session_head *h;
 891
 892        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 893                           false);
 894        if (!msg) {
 895                pr_err("create_session_msg ENOMEM creating msg\n");
 896                return NULL;
 897        }
 898        h = msg->front.iov_base;
 899        h->op = cpu_to_le32(op);
 900        h->seq = cpu_to_le64(seq);
 901
 902        return msg;
 903}
 904
 905/*
 906 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
 907 * to include additional client metadata fields.
 908 */
 909static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
 910{
 911        struct ceph_msg *msg;
 912        struct ceph_mds_session_head *h;
 913        int i = -1;
 914        int metadata_bytes = 0;
 915        int metadata_key_count = 0;
 916        struct ceph_options *opt = mdsc->fsc->client->options;
 917        struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
 918        void *p;
 919
 920        const char* metadata[][2] = {
 921                {"hostname", mdsc->nodename},
 922                {"kernel_version", init_utsname()->release},
 923                {"entity_id", opt->name ? : ""},
 924                {"root", fsopt->server_path ? : "/"},
 925                {NULL, NULL}
 926        };
 927
 928        /* Calculate serialized length of metadata */
 929        metadata_bytes = 4;  /* map length */
 930        for (i = 0; metadata[i][0]; ++i) {
 931                metadata_bytes += 8 + strlen(metadata[i][0]) +
 932                        strlen(metadata[i][1]);
 933                metadata_key_count++;
 934        }
 935
 936        /* Allocate the message */
 937        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
 938                           GFP_NOFS, false);
 939        if (!msg) {
 940                pr_err("create_session_msg ENOMEM creating msg\n");
 941                return NULL;
 942        }
 943        h = msg->front.iov_base;
 944        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
 945        h->seq = cpu_to_le64(seq);
 946
 947        /*
 948         * Serialize client metadata into waiting buffer space, using
 949         * the format that userspace expects for map<string, string>
 950         *
 951         * ClientSession messages with metadata are v2
 952         */
 953        msg->hdr.version = cpu_to_le16(2);
 954        msg->hdr.compat_version = cpu_to_le16(1);
 955
 956        /* The write pointer, following the session_head structure */
 957        p = msg->front.iov_base + sizeof(*h);
 958
 959        /* Number of entries in the map */
 960        ceph_encode_32(&p, metadata_key_count);
 961
 962        /* Two length-prefixed strings for each entry in the map */
 963        for (i = 0; metadata[i][0]; ++i) {
 964                size_t const key_len = strlen(metadata[i][0]);
 965                size_t const val_len = strlen(metadata[i][1]);
 966
 967                ceph_encode_32(&p, key_len);
 968                memcpy(p, metadata[i][0], key_len);
 969                p += key_len;
 970                ceph_encode_32(&p, val_len);
 971                memcpy(p, metadata[i][1], val_len);
 972                p += val_len;
 973        }
 974
 975        return msg;
 976}
 977
 978/*
 979 * send session open request.
 980 *
 981 * called under mdsc->mutex
 982 */
 983static int __open_session(struct ceph_mds_client *mdsc,
 984                          struct ceph_mds_session *session)
 985{
 986        struct ceph_msg *msg;
 987        int mstate;
 988        int mds = session->s_mds;
 989
 990        /* wait for mds to go active? */
 991        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 992        dout("open_session to mds%d (%s)\n", mds,
 993             ceph_mds_state_name(mstate));
 994        session->s_state = CEPH_MDS_SESSION_OPENING;
 995        session->s_renew_requested = jiffies;
 996
 997        /* send connect message */
 998        msg = create_session_open_msg(mdsc, session->s_seq);
 999        if (!msg)
1000                return -ENOMEM;
1001        ceph_con_send(&session->s_con, msg);
1002        return 0;
1003}
1004
1005/*
1006 * open sessions for any export targets for the given mds
1007 *
1008 * called under mdsc->mutex
1009 */
1010static struct ceph_mds_session *
1011__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1012{
1013        struct ceph_mds_session *session;
1014
1015        session = __ceph_lookup_mds_session(mdsc, target);
1016        if (!session) {
1017                session = register_session(mdsc, target);
1018                if (IS_ERR(session))
1019                        return session;
1020        }
1021        if (session->s_state == CEPH_MDS_SESSION_NEW ||
1022            session->s_state == CEPH_MDS_SESSION_CLOSING)
1023                __open_session(mdsc, session);
1024
1025        return session;
1026}
1027
1028struct ceph_mds_session *
1029ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1030{
1031        struct ceph_mds_session *session;
1032
1033        dout("open_export_target_session to mds%d\n", target);
1034
1035        mutex_lock(&mdsc->mutex);
1036        session = __open_export_target_session(mdsc, target);
1037        mutex_unlock(&mdsc->mutex);
1038
1039        return session;
1040}
1041
1042static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1043                                          struct ceph_mds_session *session)
1044{
1045        struct ceph_mds_info *mi;
1046        struct ceph_mds_session *ts;
1047        int i, mds = session->s_mds;
1048
1049        if (mds >= mdsc->mdsmap->m_num_mds)
1050                return;
1051
1052        mi = &mdsc->mdsmap->m_info[mds];
1053        dout("open_export_target_sessions for mds%d (%d targets)\n",
1054             session->s_mds, mi->num_export_targets);
1055
1056        for (i = 0; i < mi->num_export_targets; i++) {
1057                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1058                if (!IS_ERR(ts))
1059                        ceph_put_mds_session(ts);
1060        }
1061}
1062
1063void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1064                                           struct ceph_mds_session *session)
1065{
1066        mutex_lock(&mdsc->mutex);
1067        __open_export_target_sessions(mdsc, session);
1068        mutex_unlock(&mdsc->mutex);
1069}
1070
1071/*
1072 * session caps
1073 */
1074
1075static void detach_cap_releases(struct ceph_mds_session *session,
1076                                struct list_head *target)
1077{
1078        lockdep_assert_held(&session->s_cap_lock);
1079
1080        list_splice_init(&session->s_cap_releases, target);
1081        session->s_num_cap_releases = 0;
1082        dout("dispose_cap_releases mds%d\n", session->s_mds);
1083}
1084
1085static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1086                                 struct list_head *dispose)
1087{
1088        while (!list_empty(dispose)) {
1089                struct ceph_cap *cap;
1090                /* zero out the in-progress message */
1091                cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1092                list_del(&cap->session_caps);
1093                ceph_put_cap(mdsc, cap);
1094        }
1095}
1096
1097static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1098                                     struct ceph_mds_session *session)
1099{
1100        struct ceph_mds_request *req;
1101        struct rb_node *p;
1102
1103        dout("cleanup_session_requests mds%d\n", session->s_mds);
1104        mutex_lock(&mdsc->mutex);
1105        while (!list_empty(&session->s_unsafe)) {
1106                req = list_first_entry(&session->s_unsafe,
1107                                       struct ceph_mds_request, r_unsafe_item);
1108                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1109                                    req->r_tid);
1110                __unregister_request(mdsc, req);
1111        }
1112        /* zero r_attempts, so kick_requests() will re-send requests */
1113        p = rb_first(&mdsc->request_tree);
1114        while (p) {
1115                req = rb_entry(p, struct ceph_mds_request, r_node);
1116                p = rb_next(p);
1117                if (req->r_session &&
1118                    req->r_session->s_mds == session->s_mds)
1119                        req->r_attempts = 0;
1120        }
1121        mutex_unlock(&mdsc->mutex);
1122}
1123
1124/*
1125 * Helper to safely iterate over all caps associated with a session, with
1126 * special care taken to handle a racing __ceph_remove_cap().
1127 *
1128 * Caller must hold session s_mutex.
1129 */
1130static int iterate_session_caps(struct ceph_mds_session *session,
1131                                 int (*cb)(struct inode *, struct ceph_cap *,
1132                                            void *), void *arg)
1133{
1134        struct list_head *p;
1135        struct ceph_cap *cap;
1136        struct inode *inode, *last_inode = NULL;
1137        struct ceph_cap *old_cap = NULL;
1138        int ret;
1139
1140        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1141        spin_lock(&session->s_cap_lock);
1142        p = session->s_caps.next;
1143        while (p != &session->s_caps) {
1144                cap = list_entry(p, struct ceph_cap, session_caps);
1145                inode = igrab(&cap->ci->vfs_inode);
1146                if (!inode) {
1147                        p = p->next;
1148                        continue;
1149                }
1150                session->s_cap_iterator = cap;
1151                spin_unlock(&session->s_cap_lock);
1152
1153                if (last_inode) {
1154                        iput(last_inode);
1155                        last_inode = NULL;
1156                }
1157                if (old_cap) {
1158                        ceph_put_cap(session->s_mdsc, old_cap);
1159                        old_cap = NULL;
1160                }
1161
1162                ret = cb(inode, cap, arg);
1163                last_inode = inode;
1164
1165                spin_lock(&session->s_cap_lock);
1166                p = p->next;
1167                if (!cap->ci) {
1168                        dout("iterate_session_caps  finishing cap %p removal\n",
1169                             cap);
1170                        BUG_ON(cap->session != session);
1171                        cap->session = NULL;
1172                        list_del_init(&cap->session_caps);
1173                        session->s_nr_caps--;
1174                        if (cap->queue_release) {
1175                                list_add_tail(&cap->session_caps,
1176                                              &session->s_cap_releases);
1177                                session->s_num_cap_releases++;
1178                        } else {
1179                                old_cap = cap;  /* put_cap it w/o locks held */
1180                        }
1181                }
1182                if (ret < 0)
1183                        goto out;
1184        }
1185        ret = 0;
1186out:
1187        session->s_cap_iterator = NULL;
1188        spin_unlock(&session->s_cap_lock);
1189
1190        iput(last_inode);
1191        if (old_cap)
1192                ceph_put_cap(session->s_mdsc, old_cap);
1193
1194        return ret;
1195}
1196
1197static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1198                                  void *arg)
1199{
1200        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1201        struct ceph_inode_info *ci = ceph_inode(inode);
1202        LIST_HEAD(to_remove);
1203        bool drop = false;
1204        bool invalidate = false;
1205
1206        dout("removing cap %p, ci is %p, inode is %p\n",
1207             cap, ci, &ci->vfs_inode);
1208        spin_lock(&ci->i_ceph_lock);
1209        __ceph_remove_cap(cap, false);
1210        if (!ci->i_auth_cap) {
1211                struct ceph_cap_flush *cf;
1212                struct ceph_mds_client *mdsc = fsc->mdsc;
1213
1214                ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1215
1216                if (ci->i_wrbuffer_ref > 0 &&
1217                    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1218                        invalidate = true;
1219
1220                while (!list_empty(&ci->i_cap_flush_list)) {
1221                        cf = list_first_entry(&ci->i_cap_flush_list,
1222                                              struct ceph_cap_flush, i_list);
1223                        list_move(&cf->i_list, &to_remove);
1224                }
1225
1226                spin_lock(&mdsc->cap_dirty_lock);
1227
1228                list_for_each_entry(cf, &to_remove, i_list)
1229                        list_del(&cf->g_list);
1230
1231                if (!list_empty(&ci->i_dirty_item)) {
1232                        pr_warn_ratelimited(
1233                                " dropping dirty %s state for %p %lld\n",
1234                                ceph_cap_string(ci->i_dirty_caps),
1235                                inode, ceph_ino(inode));
1236                        ci->i_dirty_caps = 0;
1237                        list_del_init(&ci->i_dirty_item);
1238                        drop = true;
1239                }
1240                if (!list_empty(&ci->i_flushing_item)) {
1241                        pr_warn_ratelimited(
1242                                " dropping dirty+flushing %s state for %p %lld\n",
1243                                ceph_cap_string(ci->i_flushing_caps),
1244                                inode, ceph_ino(inode));
1245                        ci->i_flushing_caps = 0;
1246                        list_del_init(&ci->i_flushing_item);
1247                        mdsc->num_cap_flushing--;
1248                        drop = true;
1249                }
1250                spin_unlock(&mdsc->cap_dirty_lock);
1251
1252                if (atomic_read(&ci->i_filelock_ref) > 0) {
1253                        /* make further file lock syscall return -EIO */
1254                        ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1255                        pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1256                                            inode, ceph_ino(inode));
1257                }
1258
1259                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1260                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1261                        ci->i_prealloc_cap_flush = NULL;
1262                }
1263        }
1264        spin_unlock(&ci->i_ceph_lock);
1265        while (!list_empty(&to_remove)) {
1266                struct ceph_cap_flush *cf;
1267                cf = list_first_entry(&to_remove,
1268                                      struct ceph_cap_flush, i_list);
1269                list_del(&cf->i_list);
1270                ceph_free_cap_flush(cf);
1271        }
1272
1273        wake_up_all(&ci->i_cap_wq);
1274        if (invalidate)
1275                ceph_queue_invalidate(inode);
1276        if (drop)
1277                iput(inode);
1278        return 0;
1279}
1280
1281/*
1282 * caller must hold session s_mutex
1283 */
1284static void remove_session_caps(struct ceph_mds_session *session)
1285{
1286        struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1287        struct super_block *sb = fsc->sb;
1288        LIST_HEAD(dispose);
1289
1290        dout("remove_session_caps on %p\n", session);
1291        iterate_session_caps(session, remove_session_caps_cb, fsc);
1292
1293        wake_up_all(&fsc->mdsc->cap_flushing_wq);
1294
1295        spin_lock(&session->s_cap_lock);
1296        if (session->s_nr_caps > 0) {
1297                struct inode *inode;
1298                struct ceph_cap *cap, *prev = NULL;
1299                struct ceph_vino vino;
1300                /*
1301                 * iterate_session_caps() skips inodes that are being
1302                 * deleted, we need to wait until deletions are complete.
1303                 * __wait_on_freeing_inode() is designed for the job,
1304                 * but it is not exported, so use lookup inode function
1305                 * to access it.
1306                 */
1307                while (!list_empty(&session->s_caps)) {
1308                        cap = list_entry(session->s_caps.next,
1309                                         struct ceph_cap, session_caps);
1310                        if (cap == prev)
1311                                break;
1312                        prev = cap;
1313                        vino = cap->ci->i_vino;
1314                        spin_unlock(&session->s_cap_lock);
1315
1316                        inode = ceph_find_inode(sb, vino);
1317                        iput(inode);
1318
1319                        spin_lock(&session->s_cap_lock);
1320                }
1321        }
1322
1323        // drop cap expires and unlock s_cap_lock
1324        detach_cap_releases(session, &dispose);
1325
1326        BUG_ON(session->s_nr_caps > 0);
1327        BUG_ON(!list_empty(&session->s_cap_flushing));
1328        spin_unlock(&session->s_cap_lock);
1329        dispose_cap_releases(session->s_mdsc, &dispose);
1330}
1331
1332/*
1333 * wake up any threads waiting on this session's caps.  if the cap is
1334 * old (didn't get renewed on the client reconnect), remove it now.
1335 *
1336 * caller must hold s_mutex.
1337 */
1338static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1339                              void *arg)
1340{
1341        struct ceph_inode_info *ci = ceph_inode(inode);
1342
1343        if (arg) {
1344                spin_lock(&ci->i_ceph_lock);
1345                ci->i_wanted_max_size = 0;
1346                ci->i_requested_max_size = 0;
1347                spin_unlock(&ci->i_ceph_lock);
1348        }
1349        wake_up_all(&ci->i_cap_wq);
1350        return 0;
1351}
1352
1353static void wake_up_session_caps(struct ceph_mds_session *session,
1354                                 int reconnect)
1355{
1356        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1357        iterate_session_caps(session, wake_up_session_cb,
1358                             (void *)(unsigned long)reconnect);
1359}
1360
1361/*
1362 * Send periodic message to MDS renewing all currently held caps.  The
1363 * ack will reset the expiration for all caps from this session.
1364 *
1365 * caller holds s_mutex
1366 */
1367static int send_renew_caps(struct ceph_mds_client *mdsc,
1368                           struct ceph_mds_session *session)
1369{
1370        struct ceph_msg *msg;
1371        int state;
1372
1373        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1374            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1375                pr_info("mds%d caps stale\n", session->s_mds);
1376        session->s_renew_requested = jiffies;
1377
1378        /* do not try to renew caps until a recovering mds has reconnected
1379         * with its clients. */
1380        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1381        if (state < CEPH_MDS_STATE_RECONNECT) {
1382                dout("send_renew_caps ignoring mds%d (%s)\n",
1383                     session->s_mds, ceph_mds_state_name(state));
1384                return 0;
1385        }
1386
1387        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1388                ceph_mds_state_name(state));
1389        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1390                                 ++session->s_renew_seq);
1391        if (!msg)
1392                return -ENOMEM;
1393        ceph_con_send(&session->s_con, msg);
1394        return 0;
1395}
1396
1397static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1398                             struct ceph_mds_session *session, u64 seq)
1399{
1400        struct ceph_msg *msg;
1401
1402        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1403             session->s_mds, ceph_session_state_name(session->s_state), seq);
1404        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1405        if (!msg)
1406                return -ENOMEM;
1407        ceph_con_send(&session->s_con, msg);
1408        return 0;
1409}
1410
1411
1412/*
1413 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1414 *
1415 * Called under session->s_mutex
1416 */
1417static void renewed_caps(struct ceph_mds_client *mdsc,
1418                         struct ceph_mds_session *session, int is_renew)
1419{
1420        int was_stale;
1421        int wake = 0;
1422
1423        spin_lock(&session->s_cap_lock);
1424        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1425
1426        session->s_cap_ttl = session->s_renew_requested +
1427                mdsc->mdsmap->m_session_timeout*HZ;
1428
1429        if (was_stale) {
1430                if (time_before(jiffies, session->s_cap_ttl)) {
1431                        pr_info("mds%d caps renewed\n", session->s_mds);
1432                        wake = 1;
1433                } else {
1434                        pr_info("mds%d caps still stale\n", session->s_mds);
1435                }
1436        }
1437        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1438             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1439             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1440        spin_unlock(&session->s_cap_lock);
1441
1442        if (wake)
1443                wake_up_session_caps(session, 0);
1444}
1445
1446/*
1447 * send a session close request
1448 */
1449static int request_close_session(struct ceph_mds_client *mdsc,
1450                                 struct ceph_mds_session *session)
1451{
1452        struct ceph_msg *msg;
1453
1454        dout("request_close_session mds%d state %s seq %lld\n",
1455             session->s_mds, ceph_session_state_name(session->s_state),
1456             session->s_seq);
1457        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1458        if (!msg)
1459                return -ENOMEM;
1460        ceph_con_send(&session->s_con, msg);
1461        return 1;
1462}
1463
1464/*
1465 * Called with s_mutex held.
1466 */
1467static int __close_session(struct ceph_mds_client *mdsc,
1468                         struct ceph_mds_session *session)
1469{
1470        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1471                return 0;
1472        session->s_state = CEPH_MDS_SESSION_CLOSING;
1473        return request_close_session(mdsc, session);
1474}
1475
1476static bool drop_negative_children(struct dentry *dentry)
1477{
1478        struct dentry *child;
1479        bool all_negative = true;
1480
1481        if (!d_is_dir(dentry))
1482                goto out;
1483
1484        spin_lock(&dentry->d_lock);
1485        list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1486                if (d_really_is_positive(child)) {
1487                        all_negative = false;
1488                        break;
1489                }
1490        }
1491        spin_unlock(&dentry->d_lock);
1492
1493        if (all_negative)
1494                shrink_dcache_parent(dentry);
1495out:
1496        return all_negative;
1497}
1498
1499/*
1500 * Trim old(er) caps.
1501 *
1502 * Because we can't cache an inode without one or more caps, we do
1503 * this indirectly: if a cap is unused, we prune its aliases, at which
1504 * point the inode will hopefully get dropped to.
1505 *
1506 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1507 * memory pressure from the MDS, though, so it needn't be perfect.
1508 */
1509static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1510{
1511        struct ceph_mds_session *session = arg;
1512        struct ceph_inode_info *ci = ceph_inode(inode);
1513        int used, wanted, oissued, mine;
1514
1515        if (session->s_trim_caps <= 0)
1516                return -1;
1517
1518        spin_lock(&ci->i_ceph_lock);
1519        mine = cap->issued | cap->implemented;
1520        used = __ceph_caps_used(ci);
1521        wanted = __ceph_caps_file_wanted(ci);
1522        oissued = __ceph_caps_issued_other(ci, cap);
1523
1524        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1525             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1526             ceph_cap_string(used), ceph_cap_string(wanted));
1527        if (cap == ci->i_auth_cap) {
1528                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1529                    !list_empty(&ci->i_cap_snaps))
1530                        goto out;
1531                if ((used | wanted) & CEPH_CAP_ANY_WR)
1532                        goto out;
1533                /* Note: it's possible that i_filelock_ref becomes non-zero
1534                 * after dropping auth caps. It doesn't hurt because reply
1535                 * of lock mds request will re-add auth caps. */
1536                if (atomic_read(&ci->i_filelock_ref) > 0)
1537                        goto out;
1538        }
1539        /* The inode has cached pages, but it's no longer used.
1540         * we can safely drop it */
1541        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1542            !(oissued & CEPH_CAP_FILE_CACHE)) {
1543          used = 0;
1544          oissued = 0;
1545        }
1546        if ((used | wanted) & ~oissued & mine)
1547                goto out;   /* we need these caps */
1548
1549        if (oissued) {
1550                /* we aren't the only cap.. just remove us */
1551                __ceph_remove_cap(cap, true);
1552                session->s_trim_caps--;
1553        } else {
1554                struct dentry *dentry;
1555                /* try dropping referring dentries */
1556                spin_unlock(&ci->i_ceph_lock);
1557                dentry = d_find_any_alias(inode);
1558                if (dentry && drop_negative_children(dentry)) {
1559                        int count;
1560                        dput(dentry);
1561                        d_prune_aliases(inode);
1562                        count = atomic_read(&inode->i_count);
1563                        if (count == 1)
1564                                session->s_trim_caps--;
1565                        dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1566                             inode, cap, count);
1567                } else {
1568                        dput(dentry);
1569                }
1570                return 0;
1571        }
1572
1573out:
1574        spin_unlock(&ci->i_ceph_lock);
1575        return 0;
1576}
1577
1578/*
1579 * Trim session cap count down to some max number.
1580 */
1581int ceph_trim_caps(struct ceph_mds_client *mdsc,
1582                   struct ceph_mds_session *session,
1583                   int max_caps)
1584{
1585        int trim_caps = session->s_nr_caps - max_caps;
1586
1587        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1588             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1589        if (trim_caps > 0) {
1590                session->s_trim_caps = trim_caps;
1591                iterate_session_caps(session, trim_caps_cb, session);
1592                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1593                     session->s_mds, session->s_nr_caps, max_caps,
1594                        trim_caps - session->s_trim_caps);
1595                session->s_trim_caps = 0;
1596        }
1597
1598        ceph_send_cap_releases(mdsc, session);
1599        return 0;
1600}
1601
1602static int check_caps_flush(struct ceph_mds_client *mdsc,
1603                            u64 want_flush_tid)
1604{
1605        int ret = 1;
1606
1607        spin_lock(&mdsc->cap_dirty_lock);
1608        if (!list_empty(&mdsc->cap_flush_list)) {
1609                struct ceph_cap_flush *cf =
1610                        list_first_entry(&mdsc->cap_flush_list,
1611                                         struct ceph_cap_flush, g_list);
1612                if (cf->tid <= want_flush_tid) {
1613                        dout("check_caps_flush still flushing tid "
1614                             "%llu <= %llu\n", cf->tid, want_flush_tid);
1615                        ret = 0;
1616                }
1617        }
1618        spin_unlock(&mdsc->cap_dirty_lock);
1619        return ret;
1620}
1621
1622/*
1623 * flush all dirty inode data to disk.
1624 *
1625 * returns true if we've flushed through want_flush_tid
1626 */
1627static void wait_caps_flush(struct ceph_mds_client *mdsc,
1628                            u64 want_flush_tid)
1629{
1630        dout("check_caps_flush want %llu\n", want_flush_tid);
1631
1632        wait_event(mdsc->cap_flushing_wq,
1633                   check_caps_flush(mdsc, want_flush_tid));
1634
1635        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1636}
1637
1638/*
1639 * called under s_mutex
1640 */
1641void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1642                            struct ceph_mds_session *session)
1643{
1644        struct ceph_msg *msg = NULL;
1645        struct ceph_mds_cap_release *head;
1646        struct ceph_mds_cap_item *item;
1647        struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1648        struct ceph_cap *cap;
1649        LIST_HEAD(tmp_list);
1650        int num_cap_releases;
1651        __le32  barrier, *cap_barrier;
1652
1653        down_read(&osdc->lock);
1654        barrier = cpu_to_le32(osdc->epoch_barrier);
1655        up_read(&osdc->lock);
1656
1657        spin_lock(&session->s_cap_lock);
1658again:
1659        list_splice_init(&session->s_cap_releases, &tmp_list);
1660        num_cap_releases = session->s_num_cap_releases;
1661        session->s_num_cap_releases = 0;
1662        spin_unlock(&session->s_cap_lock);
1663
1664        while (!list_empty(&tmp_list)) {
1665                if (!msg) {
1666                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1667                                        PAGE_SIZE, GFP_NOFS, false);
1668                        if (!msg)
1669                                goto out_err;
1670                        head = msg->front.iov_base;
1671                        head->num = cpu_to_le32(0);
1672                        msg->front.iov_len = sizeof(*head);
1673
1674                        msg->hdr.version = cpu_to_le16(2);
1675                        msg->hdr.compat_version = cpu_to_le16(1);
1676                }
1677
1678                cap = list_first_entry(&tmp_list, struct ceph_cap,
1679                                        session_caps);
1680                list_del(&cap->session_caps);
1681                num_cap_releases--;
1682
1683                head = msg->front.iov_base;
1684                le32_add_cpu(&head->num, 1);
1685                item = msg->front.iov_base + msg->front.iov_len;
1686                item->ino = cpu_to_le64(cap->cap_ino);
1687                item->cap_id = cpu_to_le64(cap->cap_id);
1688                item->migrate_seq = cpu_to_le32(cap->mseq);
1689                item->seq = cpu_to_le32(cap->issue_seq);
1690                msg->front.iov_len += sizeof(*item);
1691
1692                ceph_put_cap(mdsc, cap);
1693
1694                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1695                        // Append cap_barrier field
1696                        cap_barrier = msg->front.iov_base + msg->front.iov_len;
1697                        *cap_barrier = barrier;
1698                        msg->front.iov_len += sizeof(*cap_barrier);
1699
1700                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1701                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1702                        ceph_con_send(&session->s_con, msg);
1703                        msg = NULL;
1704                }
1705        }
1706
1707        BUG_ON(num_cap_releases != 0);
1708
1709        spin_lock(&session->s_cap_lock);
1710        if (!list_empty(&session->s_cap_releases))
1711                goto again;
1712        spin_unlock(&session->s_cap_lock);
1713
1714        if (msg) {
1715                // Append cap_barrier field
1716                cap_barrier = msg->front.iov_base + msg->front.iov_len;
1717                *cap_barrier = barrier;
1718                msg->front.iov_len += sizeof(*cap_barrier);
1719
1720                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1721                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1722                ceph_con_send(&session->s_con, msg);
1723        }
1724        return;
1725out_err:
1726        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1727                session->s_mds);
1728        spin_lock(&session->s_cap_lock);
1729        list_splice(&tmp_list, &session->s_cap_releases);
1730        session->s_num_cap_releases += num_cap_releases;
1731        spin_unlock(&session->s_cap_lock);
1732}
1733
1734/*
1735 * requests
1736 */
1737
1738int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1739                                    struct inode *dir)
1740{
1741        struct ceph_inode_info *ci = ceph_inode(dir);
1742        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1743        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1744        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1745        int order, num_entries;
1746
1747        spin_lock(&ci->i_ceph_lock);
1748        num_entries = ci->i_files + ci->i_subdirs;
1749        spin_unlock(&ci->i_ceph_lock);
1750        num_entries = max(num_entries, 1);
1751        num_entries = min(num_entries, opt->max_readdir);
1752
1753        order = get_order(size * num_entries);
1754        while (order >= 0) {
1755                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1756                                                             __GFP_NOWARN,
1757                                                             order);
1758                if (rinfo->dir_entries)
1759                        break;
1760                order--;
1761        }
1762        if (!rinfo->dir_entries)
1763                return -ENOMEM;
1764
1765        num_entries = (PAGE_SIZE << order) / size;
1766        num_entries = min(num_entries, opt->max_readdir);
1767
1768        rinfo->dir_buf_size = PAGE_SIZE << order;
1769        req->r_num_caps = num_entries + 1;
1770        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1771        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1772        return 0;
1773}
1774
1775/*
1776 * Create an mds request.
1777 */
1778struct ceph_mds_request *
1779ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1780{
1781        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1782
1783        if (!req)
1784                return ERR_PTR(-ENOMEM);
1785
1786        mutex_init(&req->r_fill_mutex);
1787        req->r_mdsc = mdsc;
1788        req->r_started = jiffies;
1789        req->r_resend_mds = -1;
1790        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1791        INIT_LIST_HEAD(&req->r_unsafe_target_item);
1792        req->r_fmode = -1;
1793        kref_init(&req->r_kref);
1794        RB_CLEAR_NODE(&req->r_node);
1795        INIT_LIST_HEAD(&req->r_wait);
1796        init_completion(&req->r_completion);
1797        init_completion(&req->r_safe_completion);
1798        INIT_LIST_HEAD(&req->r_unsafe_item);
1799
1800        req->r_stamp = timespec_trunc(current_kernel_time(), mdsc->fsc->sb->s_time_gran);
1801
1802        req->r_op = op;
1803        req->r_direct_mode = mode;
1804        return req;
1805}
1806
1807/*
1808 * return oldest (lowest) request, tid in request tree, 0 if none.
1809 *
1810 * called under mdsc->mutex.
1811 */
1812static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1813{
1814        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1815                return NULL;
1816        return rb_entry(rb_first(&mdsc->request_tree),
1817                        struct ceph_mds_request, r_node);
1818}
1819
1820static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1821{
1822        return mdsc->oldest_tid;
1823}
1824
1825/*
1826 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1827 * on build_path_from_dentry in fs/cifs/dir.c.
1828 *
1829 * If @stop_on_nosnap, generate path relative to the first non-snapped
1830 * inode.
1831 *
1832 * Encode hidden .snap dirs as a double /, i.e.
1833 *   foo/.snap/bar -> foo//bar
1834 */
1835char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1836                           int stop_on_nosnap)
1837{
1838        struct dentry *temp;
1839        char *path;
1840        int len, pos;
1841        unsigned seq;
1842
1843        if (!dentry)
1844                return ERR_PTR(-EINVAL);
1845
1846retry:
1847        len = 0;
1848        seq = read_seqbegin(&rename_lock);
1849        rcu_read_lock();
1850        for (temp = dentry; !IS_ROOT(temp);) {
1851                struct inode *inode = d_inode(temp);
1852                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1853                        len++;  /* slash only */
1854                else if (stop_on_nosnap && inode &&
1855                         ceph_snap(inode) == CEPH_NOSNAP)
1856                        break;
1857                else
1858                        len += 1 + temp->d_name.len;
1859                temp = temp->d_parent;
1860        }
1861        rcu_read_unlock();
1862        if (len)
1863                len--;  /* no leading '/' */
1864
1865        path = kmalloc(len+1, GFP_NOFS);
1866        if (!path)
1867                return ERR_PTR(-ENOMEM);
1868        pos = len;
1869        path[pos] = 0;  /* trailing null */
1870        rcu_read_lock();
1871        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1872                struct inode *inode;
1873
1874                spin_lock(&temp->d_lock);
1875                inode = d_inode(temp);
1876                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1877                        dout("build_path path+%d: %p SNAPDIR\n",
1878                             pos, temp);
1879                } else if (stop_on_nosnap && inode &&
1880                           ceph_snap(inode) == CEPH_NOSNAP) {
1881                        spin_unlock(&temp->d_lock);
1882                        break;
1883                } else {
1884                        pos -= temp->d_name.len;
1885                        if (pos < 0) {
1886                                spin_unlock(&temp->d_lock);
1887                                break;
1888                        }
1889                        strncpy(path + pos, temp->d_name.name,
1890                                temp->d_name.len);
1891                }
1892                spin_unlock(&temp->d_lock);
1893                if (pos)
1894                        path[--pos] = '/';
1895                temp = temp->d_parent;
1896        }
1897        rcu_read_unlock();
1898        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1899                pr_err("build_path did not end path lookup where "
1900                       "expected, namelen is %d, pos is %d\n", len, pos);
1901                /* presumably this is only possible if racing with a
1902                   rename of one of the parent directories (we can not
1903                   lock the dentries above us to prevent this, but
1904                   retrying should be harmless) */
1905                kfree(path);
1906                goto retry;
1907        }
1908
1909        *base = ceph_ino(d_inode(temp));
1910        *plen = len;
1911        dout("build_path on %p %d built %llx '%.*s'\n",
1912             dentry, d_count(dentry), *base, len, path);
1913        return path;
1914}
1915
1916static int build_dentry_path(struct dentry *dentry, struct inode *dir,
1917                             const char **ppath, int *ppathlen, u64 *pino,
1918                             int *pfreepath)
1919{
1920        char *path;
1921
1922        rcu_read_lock();
1923        if (!dir)
1924                dir = d_inode_rcu(dentry->d_parent);
1925        if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
1926                *pino = ceph_ino(dir);
1927                rcu_read_unlock();
1928                *ppath = dentry->d_name.name;
1929                *ppathlen = dentry->d_name.len;
1930                return 0;
1931        }
1932        rcu_read_unlock();
1933        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1934        if (IS_ERR(path))
1935                return PTR_ERR(path);
1936        *ppath = path;
1937        *pfreepath = 1;
1938        return 0;
1939}
1940
1941static int build_inode_path(struct inode *inode,
1942                            const char **ppath, int *ppathlen, u64 *pino,
1943                            int *pfreepath)
1944{
1945        struct dentry *dentry;
1946        char *path;
1947
1948        if (ceph_snap(inode) == CEPH_NOSNAP) {
1949                *pino = ceph_ino(inode);
1950                *ppathlen = 0;
1951                return 0;
1952        }
1953        dentry = d_find_alias(inode);
1954        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1955        dput(dentry);
1956        if (IS_ERR(path))
1957                return PTR_ERR(path);
1958        *ppath = path;
1959        *pfreepath = 1;
1960        return 0;
1961}
1962
1963/*
1964 * request arguments may be specified via an inode *, a dentry *, or
1965 * an explicit ino+path.
1966 */
1967static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1968                                  struct inode *rdiri, const char *rpath,
1969                                  u64 rino, const char **ppath, int *pathlen,
1970                                  u64 *ino, int *freepath)
1971{
1972        int r = 0;
1973
1974        if (rinode) {
1975                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1976                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1977                     ceph_snap(rinode));
1978        } else if (rdentry) {
1979                r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1980                                        freepath);
1981                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1982                     *ppath);
1983        } else if (rpath || rino) {
1984                *ino = rino;
1985                *ppath = rpath;
1986                *pathlen = rpath ? strlen(rpath) : 0;
1987                dout(" path %.*s\n", *pathlen, rpath);
1988        }
1989
1990        return r;
1991}
1992
1993/*
1994 * called under mdsc->mutex
1995 */
1996static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1997                                               struct ceph_mds_request *req,
1998                                               int mds, bool drop_cap_releases)
1999{
2000        struct ceph_msg *msg;
2001        struct ceph_mds_request_head *head;
2002        const char *path1 = NULL;
2003        const char *path2 = NULL;
2004        u64 ino1 = 0, ino2 = 0;
2005        int pathlen1 = 0, pathlen2 = 0;
2006        int freepath1 = 0, freepath2 = 0;
2007        int len;
2008        u16 releases;
2009        void *p, *end;
2010        int ret;
2011
2012        ret = set_request_path_attr(req->r_inode, req->r_dentry,
2013                              req->r_parent, req->r_path1, req->r_ino1.ino,
2014                              &path1, &pathlen1, &ino1, &freepath1);
2015        if (ret < 0) {
2016                msg = ERR_PTR(ret);
2017                goto out;
2018        }
2019
2020        ret = set_request_path_attr(NULL, req->r_old_dentry,
2021                              req->r_old_dentry_dir,
2022                              req->r_path2, req->r_ino2.ino,
2023                              &path2, &pathlen2, &ino2, &freepath2);
2024        if (ret < 0) {
2025                msg = ERR_PTR(ret);
2026                goto out_free1;
2027        }
2028
2029        len = sizeof(*head) +
2030                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2031                sizeof(struct ceph_timespec);
2032
2033        /* calculate (max) length for cap releases */
2034        len += sizeof(struct ceph_mds_request_release) *
2035                (!!req->r_inode_drop + !!req->r_dentry_drop +
2036                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2037        if (req->r_dentry_drop)
2038                len += req->r_dentry->d_name.len;
2039        if (req->r_old_dentry_drop)
2040                len += req->r_old_dentry->d_name.len;
2041
2042        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
2043        if (!msg) {
2044                msg = ERR_PTR(-ENOMEM);
2045                goto out_free2;
2046        }
2047
2048        msg->hdr.version = cpu_to_le16(2);
2049        msg->hdr.tid = cpu_to_le64(req->r_tid);
2050
2051        head = msg->front.iov_base;
2052        p = msg->front.iov_base + sizeof(*head);
2053        end = msg->front.iov_base + msg->front.iov_len;
2054
2055        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2056        head->op = cpu_to_le32(req->r_op);
2057        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2058        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2059        head->args = req->r_args;
2060
2061        ceph_encode_filepath(&p, end, ino1, path1);
2062        ceph_encode_filepath(&p, end, ino2, path2);
2063
2064        /* make note of release offset, in case we need to replay */
2065        req->r_request_release_offset = p - msg->front.iov_base;
2066
2067        /* cap releases */
2068        releases = 0;
2069        if (req->r_inode_drop)
2070                releases += ceph_encode_inode_release(&p,
2071                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2072                      mds, req->r_inode_drop, req->r_inode_unless, 0);
2073        if (req->r_dentry_drop)
2074                releases += ceph_encode_dentry_release(&p, req->r_dentry,
2075                                req->r_parent, mds, req->r_dentry_drop,
2076                                req->r_dentry_unless);
2077        if (req->r_old_dentry_drop)
2078                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2079                                req->r_old_dentry_dir, mds,
2080                                req->r_old_dentry_drop,
2081                                req->r_old_dentry_unless);
2082        if (req->r_old_inode_drop)
2083                releases += ceph_encode_inode_release(&p,
2084                      d_inode(req->r_old_dentry),
2085                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2086
2087        if (drop_cap_releases) {
2088                releases = 0;
2089                p = msg->front.iov_base + req->r_request_release_offset;
2090        }
2091
2092        head->num_releases = cpu_to_le16(releases);
2093
2094        /* time stamp */
2095        {
2096                struct ceph_timespec ts;
2097                ceph_encode_timespec(&ts, &req->r_stamp);
2098                ceph_encode_copy(&p, &ts, sizeof(ts));
2099        }
2100
2101        BUG_ON(p > end);
2102        msg->front.iov_len = p - msg->front.iov_base;
2103        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2104
2105        if (req->r_pagelist) {
2106                struct ceph_pagelist *pagelist = req->r_pagelist;
2107                refcount_inc(&pagelist->refcnt);
2108                ceph_msg_data_add_pagelist(msg, pagelist);
2109                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2110        } else {
2111                msg->hdr.data_len = 0;
2112        }
2113
2114        msg->hdr.data_off = cpu_to_le16(0);
2115
2116out_free2:
2117        if (freepath2)
2118                kfree((char *)path2);
2119out_free1:
2120        if (freepath1)
2121                kfree((char *)path1);
2122out:
2123        return msg;
2124}
2125
2126/*
2127 * called under mdsc->mutex if error, under no mutex if
2128 * success.
2129 */
2130static void complete_request(struct ceph_mds_client *mdsc,
2131                             struct ceph_mds_request *req)
2132{
2133        if (req->r_callback)
2134                req->r_callback(mdsc, req);
2135        else
2136                complete_all(&req->r_completion);
2137}
2138
2139/*
2140 * called under mdsc->mutex
2141 */
2142static int __prepare_send_request(struct ceph_mds_client *mdsc,
2143                                  struct ceph_mds_request *req,
2144                                  int mds, bool drop_cap_releases)
2145{
2146        struct ceph_mds_request_head *rhead;
2147        struct ceph_msg *msg;
2148        int flags = 0;
2149
2150        req->r_attempts++;
2151        if (req->r_inode) {
2152                struct ceph_cap *cap =
2153                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2154
2155                if (cap)
2156                        req->r_sent_on_mseq = cap->mseq;
2157                else
2158                        req->r_sent_on_mseq = -1;
2159        }
2160        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2161             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2162
2163        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2164                void *p;
2165                /*
2166                 * Replay.  Do not regenerate message (and rebuild
2167                 * paths, etc.); just use the original message.
2168                 * Rebuilding paths will break for renames because
2169                 * d_move mangles the src name.
2170                 */
2171                msg = req->r_request;
2172                rhead = msg->front.iov_base;
2173
2174                flags = le32_to_cpu(rhead->flags);
2175                flags |= CEPH_MDS_FLAG_REPLAY;
2176                rhead->flags = cpu_to_le32(flags);
2177
2178                if (req->r_target_inode)
2179                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2180
2181                rhead->num_retry = req->r_attempts - 1;
2182
2183                /* remove cap/dentry releases from message */
2184                rhead->num_releases = 0;
2185
2186                /* time stamp */
2187                p = msg->front.iov_base + req->r_request_release_offset;
2188                {
2189                        struct ceph_timespec ts;
2190                        ceph_encode_timespec(&ts, &req->r_stamp);
2191                        ceph_encode_copy(&p, &ts, sizeof(ts));
2192                }
2193
2194                msg->front.iov_len = p - msg->front.iov_base;
2195                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2196                return 0;
2197        }
2198
2199        if (req->r_request) {
2200                ceph_msg_put(req->r_request);
2201                req->r_request = NULL;
2202        }
2203        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2204        if (IS_ERR(msg)) {
2205                req->r_err = PTR_ERR(msg);
2206                return PTR_ERR(msg);
2207        }
2208        req->r_request = msg;
2209
2210        rhead = msg->front.iov_base;
2211        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2212        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2213                flags |= CEPH_MDS_FLAG_REPLAY;
2214        if (req->r_parent)
2215                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2216        rhead->flags = cpu_to_le32(flags);
2217        rhead->num_fwd = req->r_num_fwd;
2218        rhead->num_retry = req->r_attempts - 1;
2219        rhead->ino = 0;
2220
2221        dout(" r_parent = %p\n", req->r_parent);
2222        return 0;
2223}
2224
2225/*
2226 * send request, or put it on the appropriate wait list.
2227 */
2228static int __do_request(struct ceph_mds_client *mdsc,
2229                        struct ceph_mds_request *req)
2230{
2231        struct ceph_mds_session *session = NULL;
2232        int mds = -1;
2233        int err = 0;
2234
2235        if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2236                if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2237                        __unregister_request(mdsc, req);
2238                goto out;
2239        }
2240
2241        if (req->r_timeout &&
2242            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2243                dout("do_request timed out\n");
2244                err = -EIO;
2245                goto finish;
2246        }
2247        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2248                dout("do_request forced umount\n");
2249                err = -EIO;
2250                goto finish;
2251        }
2252        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2253                if (mdsc->mdsmap_err) {
2254                        err = mdsc->mdsmap_err;
2255                        dout("do_request mdsmap err %d\n", err);
2256                        goto finish;
2257                }
2258                if (mdsc->mdsmap->m_epoch == 0) {
2259                        dout("do_request no mdsmap, waiting for map\n");
2260                        list_add(&req->r_wait, &mdsc->waiting_for_map);
2261                        goto finish;
2262                }
2263                if (!(mdsc->fsc->mount_options->flags &
2264                      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2265                    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2266                        err = -ENOENT;
2267                        pr_info("probably no mds server is up\n");
2268                        goto finish;
2269                }
2270        }
2271
2272        put_request_session(req);
2273
2274        mds = __choose_mds(mdsc, req);
2275        if (mds < 0 ||
2276            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2277                dout("do_request no mds or not active, waiting for map\n");
2278                list_add(&req->r_wait, &mdsc->waiting_for_map);
2279                goto out;
2280        }
2281
2282        /* get, open session */
2283        session = __ceph_lookup_mds_session(mdsc, mds);
2284        if (!session) {
2285                session = register_session(mdsc, mds);
2286                if (IS_ERR(session)) {
2287                        err = PTR_ERR(session);
2288                        goto finish;
2289                }
2290        }
2291        req->r_session = get_session(session);
2292
2293        dout("do_request mds%d session %p state %s\n", mds, session,
2294             ceph_session_state_name(session->s_state));
2295        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2296            session->s_state != CEPH_MDS_SESSION_HUNG) {
2297                if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2298                        err = -EACCES;
2299                        goto out_session;
2300                }
2301                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2302                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2303                        __open_session(mdsc, session);
2304                list_add(&req->r_wait, &session->s_waiting);
2305                goto out_session;
2306        }
2307
2308        /* send request */
2309        req->r_resend_mds = -1;   /* forget any previous mds hint */
2310
2311        if (req->r_request_started == 0)   /* note request start time */
2312                req->r_request_started = jiffies;
2313
2314        err = __prepare_send_request(mdsc, req, mds, false);
2315        if (!err) {
2316                ceph_msg_get(req->r_request);
2317                ceph_con_send(&session->s_con, req->r_request);
2318        }
2319
2320out_session:
2321        ceph_put_mds_session(session);
2322finish:
2323        if (err) {
2324                dout("__do_request early error %d\n", err);
2325                req->r_err = err;
2326                complete_request(mdsc, req);
2327                __unregister_request(mdsc, req);
2328        }
2329out:
2330        return err;
2331}
2332
2333/*
2334 * called under mdsc->mutex
2335 */
2336static void __wake_requests(struct ceph_mds_client *mdsc,
2337                            struct list_head *head)
2338{
2339        struct ceph_mds_request *req;
2340        LIST_HEAD(tmp_list);
2341
2342        list_splice_init(head, &tmp_list);
2343
2344        while (!list_empty(&tmp_list)) {
2345                req = list_entry(tmp_list.next,
2346                                 struct ceph_mds_request, r_wait);
2347                list_del_init(&req->r_wait);
2348                dout(" wake request %p tid %llu\n", req, req->r_tid);
2349                __do_request(mdsc, req);
2350        }
2351}
2352
2353/*
2354 * Wake up threads with requests pending for @mds, so that they can
2355 * resubmit their requests to a possibly different mds.
2356 */
2357static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2358{
2359        struct ceph_mds_request *req;
2360        struct rb_node *p = rb_first(&mdsc->request_tree);
2361
2362        dout("kick_requests mds%d\n", mds);
2363        while (p) {
2364                req = rb_entry(p, struct ceph_mds_request, r_node);
2365                p = rb_next(p);
2366                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2367                        continue;
2368                if (req->r_attempts > 0)
2369                        continue; /* only new requests */
2370                if (req->r_session &&
2371                    req->r_session->s_mds == mds) {
2372                        dout(" kicking tid %llu\n", req->r_tid);
2373                        list_del_init(&req->r_wait);
2374                        __do_request(mdsc, req);
2375                }
2376        }
2377}
2378
2379void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2380                              struct ceph_mds_request *req)
2381{
2382        dout("submit_request on %p\n", req);
2383        mutex_lock(&mdsc->mutex);
2384        __register_request(mdsc, req, NULL);
2385        __do_request(mdsc, req);
2386        mutex_unlock(&mdsc->mutex);
2387}
2388
2389/*
2390 * Synchrously perform an mds request.  Take care of all of the
2391 * session setup, forwarding, retry details.
2392 */
2393int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2394                         struct inode *dir,
2395                         struct ceph_mds_request *req)
2396{
2397        int err;
2398
2399        dout("do_request on %p\n", req);
2400
2401        /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2402        if (req->r_inode)
2403                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2404        if (req->r_parent)
2405                ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2406        if (req->r_old_dentry_dir)
2407                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2408                                  CEPH_CAP_PIN);
2409
2410        /* issue */
2411        mutex_lock(&mdsc->mutex);
2412        __register_request(mdsc, req, dir);
2413        __do_request(mdsc, req);
2414
2415        if (req->r_err) {
2416                err = req->r_err;
2417                goto out;
2418        }
2419
2420        /* wait */
2421        mutex_unlock(&mdsc->mutex);
2422        dout("do_request waiting\n");
2423        if (!req->r_timeout && req->r_wait_for_completion) {
2424                err = req->r_wait_for_completion(mdsc, req);
2425        } else {
2426                long timeleft = wait_for_completion_killable_timeout(
2427                                        &req->r_completion,
2428                                        ceph_timeout_jiffies(req->r_timeout));
2429                if (timeleft > 0)
2430                        err = 0;
2431                else if (!timeleft)
2432                        err = -EIO;  /* timed out */
2433                else
2434                        err = timeleft;  /* killed */
2435        }
2436        dout("do_request waited, got %d\n", err);
2437        mutex_lock(&mdsc->mutex);
2438
2439        /* only abort if we didn't race with a real reply */
2440        if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2441                err = le32_to_cpu(req->r_reply_info.head->result);
2442        } else if (err < 0) {
2443                dout("aborted request %lld with %d\n", req->r_tid, err);
2444
2445                /*
2446                 * ensure we aren't running concurrently with
2447                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2448                 * rely on locks (dir mutex) held by our caller.
2449                 */
2450                mutex_lock(&req->r_fill_mutex);
2451                req->r_err = err;
2452                set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2453                mutex_unlock(&req->r_fill_mutex);
2454
2455                if (req->r_parent &&
2456                    (req->r_op & CEPH_MDS_OP_WRITE))
2457                        ceph_invalidate_dir_request(req);
2458        } else {
2459                err = req->r_err;
2460        }
2461
2462out:
2463        mutex_unlock(&mdsc->mutex);
2464        dout("do_request %p done, result %d\n", req, err);
2465        return err;
2466}
2467
2468/*
2469 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2470 * namespace request.
2471 */
2472void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2473{
2474        struct inode *dir = req->r_parent;
2475        struct inode *old_dir = req->r_old_dentry_dir;
2476
2477        dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2478
2479        ceph_dir_clear_complete(dir);
2480        if (old_dir)
2481                ceph_dir_clear_complete(old_dir);
2482        if (req->r_dentry)
2483                ceph_invalidate_dentry_lease(req->r_dentry);
2484        if (req->r_old_dentry)
2485                ceph_invalidate_dentry_lease(req->r_old_dentry);
2486}
2487
2488/*
2489 * Handle mds reply.
2490 *
2491 * We take the session mutex and parse and process the reply immediately.
2492 * This preserves the logical ordering of replies, capabilities, etc., sent
2493 * by the MDS as they are applied to our local cache.
2494 */
2495static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2496{
2497        struct ceph_mds_client *mdsc = session->s_mdsc;
2498        struct ceph_mds_request *req;
2499        struct ceph_mds_reply_head *head = msg->front.iov_base;
2500        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2501        struct ceph_snap_realm *realm;
2502        u64 tid;
2503        int err, result;
2504        int mds = session->s_mds;
2505
2506        if (msg->front.iov_len < sizeof(*head)) {
2507                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2508                ceph_msg_dump(msg);
2509                return;
2510        }
2511
2512        /* get request, session */
2513        tid = le64_to_cpu(msg->hdr.tid);
2514        mutex_lock(&mdsc->mutex);
2515        req = lookup_get_request(mdsc, tid);
2516        if (!req) {
2517                dout("handle_reply on unknown tid %llu\n", tid);
2518                mutex_unlock(&mdsc->mutex);
2519                return;
2520        }
2521        dout("handle_reply %p\n", req);
2522
2523        /* correct session? */
2524        if (req->r_session != session) {
2525                pr_err("mdsc_handle_reply got %llu on session mds%d"
2526                       " not mds%d\n", tid, session->s_mds,
2527                       req->r_session ? req->r_session->s_mds : -1);
2528                mutex_unlock(&mdsc->mutex);
2529                goto out;
2530        }
2531
2532        /* dup? */
2533        if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2534            (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2535                pr_warn("got a dup %s reply on %llu from mds%d\n",
2536                           head->safe ? "safe" : "unsafe", tid, mds);
2537                mutex_unlock(&mdsc->mutex);
2538                goto out;
2539        }
2540        if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2541                pr_warn("got unsafe after safe on %llu from mds%d\n",
2542                           tid, mds);
2543                mutex_unlock(&mdsc->mutex);
2544                goto out;
2545        }
2546
2547        result = le32_to_cpu(head->result);
2548
2549        /*
2550         * Handle an ESTALE
2551         * if we're not talking to the authority, send to them
2552         * if the authority has changed while we weren't looking,
2553         * send to new authority
2554         * Otherwise we just have to return an ESTALE
2555         */
2556        if (result == -ESTALE) {
2557                dout("got ESTALE on request %llu\n", req->r_tid);
2558                req->r_resend_mds = -1;
2559                if (req->r_direct_mode != USE_AUTH_MDS) {
2560                        dout("not using auth, setting for that now\n");
2561                        req->r_direct_mode = USE_AUTH_MDS;
2562                        __do_request(mdsc, req);
2563                        mutex_unlock(&mdsc->mutex);
2564                        goto out;
2565                } else  {
2566                        int mds = __choose_mds(mdsc, req);
2567                        if (mds >= 0 && mds != req->r_session->s_mds) {
2568                                dout("but auth changed, so resending\n");
2569                                __do_request(mdsc, req);
2570                                mutex_unlock(&mdsc->mutex);
2571                                goto out;
2572                        }
2573                }
2574                dout("have to return ESTALE on request %llu\n", req->r_tid);
2575        }
2576
2577
2578        if (head->safe) {
2579                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2580                __unregister_request(mdsc, req);
2581
2582                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2583                        /*
2584                         * We already handled the unsafe response, now do the
2585                         * cleanup.  No need to examine the response; the MDS
2586                         * doesn't include any result info in the safe
2587                         * response.  And even if it did, there is nothing
2588                         * useful we could do with a revised return value.
2589                         */
2590                        dout("got safe reply %llu, mds%d\n", tid, mds);
2591
2592                        /* last unsafe request during umount? */
2593                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2594                                complete_all(&mdsc->safe_umount_waiters);
2595                        mutex_unlock(&mdsc->mutex);
2596                        goto out;
2597                }
2598        } else {
2599                set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2600                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2601                if (req->r_unsafe_dir) {
2602                        struct ceph_inode_info *ci =
2603                                        ceph_inode(req->r_unsafe_dir);
2604                        spin_lock(&ci->i_unsafe_lock);
2605                        list_add_tail(&req->r_unsafe_dir_item,
2606                                      &ci->i_unsafe_dirops);
2607                        spin_unlock(&ci->i_unsafe_lock);
2608                }
2609        }
2610
2611        dout("handle_reply tid %lld result %d\n", tid, result);
2612        rinfo = &req->r_reply_info;
2613        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2614        mutex_unlock(&mdsc->mutex);
2615
2616        mutex_lock(&session->s_mutex);
2617        if (err < 0) {
2618                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2619                ceph_msg_dump(msg);
2620                goto out_err;
2621        }
2622
2623        /* snap trace */
2624        realm = NULL;
2625        if (rinfo->snapblob_len) {
2626                down_write(&mdsc->snap_rwsem);
2627                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2628                                rinfo->snapblob + rinfo->snapblob_len,
2629                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2630                                &realm);
2631                downgrade_write(&mdsc->snap_rwsem);
2632        } else {
2633                down_read(&mdsc->snap_rwsem);
2634        }
2635
2636        /* insert trace into our cache */
2637        mutex_lock(&req->r_fill_mutex);
2638        current->journal_info = req;
2639        err = ceph_fill_trace(mdsc->fsc->sb, req);
2640        if (err == 0) {
2641                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2642                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2643                        ceph_readdir_prepopulate(req, req->r_session);
2644                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2645        }
2646        current->journal_info = NULL;
2647        mutex_unlock(&req->r_fill_mutex);
2648
2649        up_read(&mdsc->snap_rwsem);
2650        if (realm)
2651                ceph_put_snap_realm(mdsc, realm);
2652
2653        if (err == 0 && req->r_target_inode &&
2654            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2655                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2656                spin_lock(&ci->i_unsafe_lock);
2657                list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2658                spin_unlock(&ci->i_unsafe_lock);
2659        }
2660out_err:
2661        mutex_lock(&mdsc->mutex);
2662        if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2663                if (err) {
2664                        req->r_err = err;
2665                } else {
2666                        req->r_reply =  ceph_msg_get(msg);
2667                        set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2668                }
2669        } else {
2670                dout("reply arrived after request %lld was aborted\n", tid);
2671        }
2672        mutex_unlock(&mdsc->mutex);
2673
2674        mutex_unlock(&session->s_mutex);
2675
2676        /* kick calling process */
2677        complete_request(mdsc, req);
2678out:
2679        ceph_mdsc_put_request(req);
2680        return;
2681}
2682
2683
2684
2685/*
2686 * handle mds notification that our request has been forwarded.
2687 */
2688static void handle_forward(struct ceph_mds_client *mdsc,
2689                           struct ceph_mds_session *session,
2690                           struct ceph_msg *msg)
2691{
2692        struct ceph_mds_request *req;
2693        u64 tid = le64_to_cpu(msg->hdr.tid);
2694        u32 next_mds;
2695        u32 fwd_seq;
2696        int err = -EINVAL;
2697        void *p = msg->front.iov_base;
2698        void *end = p + msg->front.iov_len;
2699
2700        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2701        next_mds = ceph_decode_32(&p);
2702        fwd_seq = ceph_decode_32(&p);
2703
2704        mutex_lock(&mdsc->mutex);
2705        req = lookup_get_request(mdsc, tid);
2706        if (!req) {
2707                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2708                goto out;  /* dup reply? */
2709        }
2710
2711        if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2712                dout("forward tid %llu aborted, unregistering\n", tid);
2713                __unregister_request(mdsc, req);
2714        } else if (fwd_seq <= req->r_num_fwd) {
2715                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2716                     tid, next_mds, req->r_num_fwd, fwd_seq);
2717        } else {
2718                /* resend. forward race not possible; mds would drop */
2719                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2720                BUG_ON(req->r_err);
2721                BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2722                req->r_attempts = 0;
2723                req->r_num_fwd = fwd_seq;
2724                req->r_resend_mds = next_mds;
2725                put_request_session(req);
2726                __do_request(mdsc, req);
2727        }
2728        ceph_mdsc_put_request(req);
2729out:
2730        mutex_unlock(&mdsc->mutex);
2731        return;
2732
2733bad:
2734        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2735}
2736
2737/*
2738 * handle a mds session control message
2739 */
2740static void handle_session(struct ceph_mds_session *session,
2741                           struct ceph_msg *msg)
2742{
2743        struct ceph_mds_client *mdsc = session->s_mdsc;
2744        u32 op;
2745        u64 seq;
2746        int mds = session->s_mds;
2747        struct ceph_mds_session_head *h = msg->front.iov_base;
2748        int wake = 0;
2749
2750        /* decode */
2751        if (msg->front.iov_len != sizeof(*h))
2752                goto bad;
2753        op = le32_to_cpu(h->op);
2754        seq = le64_to_cpu(h->seq);
2755
2756        mutex_lock(&mdsc->mutex);
2757        if (op == CEPH_SESSION_CLOSE) {
2758                get_session(session);
2759                __unregister_session(mdsc, session);
2760        }
2761        /* FIXME: this ttl calculation is generous */
2762        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2763        mutex_unlock(&mdsc->mutex);
2764
2765        mutex_lock(&session->s_mutex);
2766
2767        dout("handle_session mds%d %s %p state %s seq %llu\n",
2768             mds, ceph_session_op_name(op), session,
2769             ceph_session_state_name(session->s_state), seq);
2770
2771        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2772                session->s_state = CEPH_MDS_SESSION_OPEN;
2773                pr_info("mds%d came back\n", session->s_mds);
2774        }
2775
2776        switch (op) {
2777        case CEPH_SESSION_OPEN:
2778                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2779                        pr_info("mds%d reconnect success\n", session->s_mds);
2780                session->s_state = CEPH_MDS_SESSION_OPEN;
2781                renewed_caps(mdsc, session, 0);
2782                wake = 1;
2783                if (mdsc->stopping)
2784                        __close_session(mdsc, session);
2785                break;
2786
2787        case CEPH_SESSION_RENEWCAPS:
2788                if (session->s_renew_seq == seq)
2789                        renewed_caps(mdsc, session, 1);
2790                break;
2791
2792        case CEPH_SESSION_CLOSE:
2793                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2794                        pr_info("mds%d reconnect denied\n", session->s_mds);
2795                cleanup_session_requests(mdsc, session);
2796                remove_session_caps(session);
2797                wake = 2; /* for good measure */
2798                wake_up_all(&mdsc->session_close_wq);
2799                break;
2800
2801        case CEPH_SESSION_STALE:
2802                pr_info("mds%d caps went stale, renewing\n",
2803                        session->s_mds);
2804                spin_lock(&session->s_gen_ttl_lock);
2805                session->s_cap_gen++;
2806                session->s_cap_ttl = jiffies - 1;
2807                spin_unlock(&session->s_gen_ttl_lock);
2808                send_renew_caps(mdsc, session);
2809                break;
2810
2811        case CEPH_SESSION_RECALL_STATE:
2812                ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2813                break;
2814
2815        case CEPH_SESSION_FLUSHMSG:
2816                send_flushmsg_ack(mdsc, session, seq);
2817                break;
2818
2819        case CEPH_SESSION_FORCE_RO:
2820                dout("force_session_readonly %p\n", session);
2821                spin_lock(&session->s_cap_lock);
2822                session->s_readonly = true;
2823                spin_unlock(&session->s_cap_lock);
2824                wake_up_session_caps(session, 0);
2825                break;
2826
2827        case CEPH_SESSION_REJECT:
2828                WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
2829                pr_info("mds%d rejected session\n", session->s_mds);
2830                session->s_state = CEPH_MDS_SESSION_REJECTED;
2831                cleanup_session_requests(mdsc, session);
2832                remove_session_caps(session);
2833                wake = 2; /* for good measure */
2834                break;
2835
2836        default:
2837                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2838                WARN_ON(1);
2839        }
2840
2841        mutex_unlock(&session->s_mutex);
2842        if (wake) {
2843                mutex_lock(&mdsc->mutex);
2844                __wake_requests(mdsc, &session->s_waiting);
2845                if (wake == 2)
2846                        kick_requests(mdsc, mds);
2847                mutex_unlock(&mdsc->mutex);
2848        }
2849        if (op == CEPH_SESSION_CLOSE)
2850                ceph_put_mds_session(session);
2851        return;
2852
2853bad:
2854        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2855               (int)msg->front.iov_len);
2856        ceph_msg_dump(msg);
2857        return;
2858}
2859
2860
2861/*
2862 * called under session->mutex.
2863 */
2864static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2865                                   struct ceph_mds_session *session)
2866{
2867        struct ceph_mds_request *req, *nreq;
2868        struct rb_node *p;
2869        int err;
2870
2871        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2872
2873        mutex_lock(&mdsc->mutex);
2874        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2875                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2876                if (!err) {
2877                        ceph_msg_get(req->r_request);
2878                        ceph_con_send(&session->s_con, req->r_request);
2879                }
2880        }
2881
2882        /*
2883         * also re-send old requests when MDS enters reconnect stage. So that MDS
2884         * can process completed request in clientreplay stage.
2885         */
2886        p = rb_first(&mdsc->request_tree);
2887        while (p) {
2888                req = rb_entry(p, struct ceph_mds_request, r_node);
2889                p = rb_next(p);
2890                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2891                        continue;
2892                if (req->r_attempts == 0)
2893                        continue; /* only old requests */
2894                if (req->r_session &&
2895                    req->r_session->s_mds == session->s_mds) {
2896                        err = __prepare_send_request(mdsc, req,
2897                                                     session->s_mds, true);
2898                        if (!err) {
2899                                ceph_msg_get(req->r_request);
2900                                ceph_con_send(&session->s_con, req->r_request);
2901                        }
2902                }
2903        }
2904        mutex_unlock(&mdsc->mutex);
2905}
2906
2907/*
2908 * Encode information about a cap for a reconnect with the MDS.
2909 */
2910static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2911                          void *arg)
2912{
2913        union {
2914                struct ceph_mds_cap_reconnect v2;
2915                struct ceph_mds_cap_reconnect_v1 v1;
2916        } rec;
2917        struct ceph_inode_info *ci = cap->ci;
2918        struct ceph_reconnect_state *recon_state = arg;
2919        struct ceph_pagelist *pagelist = recon_state->pagelist;
2920        char *path;
2921        int pathlen, err;
2922        u64 pathbase;
2923        u64 snap_follows;
2924        struct dentry *dentry;
2925
2926        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2927             inode, ceph_vinop(inode), cap, cap->cap_id,
2928             ceph_cap_string(cap->issued));
2929        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2930        if (err)
2931                return err;
2932
2933        dentry = d_find_alias(inode);
2934        if (dentry) {
2935                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2936                if (IS_ERR(path)) {
2937                        err = PTR_ERR(path);
2938                        goto out_dput;
2939                }
2940        } else {
2941                path = NULL;
2942                pathlen = 0;
2943                pathbase = 0;
2944        }
2945
2946        spin_lock(&ci->i_ceph_lock);
2947        cap->seq = 0;        /* reset cap seq */
2948        cap->issue_seq = 0;  /* and issue_seq */
2949        cap->mseq = 0;       /* and migrate_seq */
2950        cap->cap_gen = cap->session->s_cap_gen;
2951
2952        if (recon_state->msg_version >= 2) {
2953                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2954                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2955                rec.v2.issued = cpu_to_le32(cap->issued);
2956                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2957                rec.v2.pathbase = cpu_to_le64(pathbase);
2958                rec.v2.flock_len = (__force __le32)
2959                        ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
2960        } else {
2961                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2962                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2963                rec.v1.issued = cpu_to_le32(cap->issued);
2964                rec.v1.size = cpu_to_le64(inode->i_size);
2965                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2966                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2967                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2968                rec.v1.pathbase = cpu_to_le64(pathbase);
2969        }
2970
2971        if (list_empty(&ci->i_cap_snaps)) {
2972                snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
2973        } else {
2974                struct ceph_cap_snap *capsnap =
2975                        list_first_entry(&ci->i_cap_snaps,
2976                                         struct ceph_cap_snap, ci_item);
2977                snap_follows = capsnap->follows;
2978        }
2979        spin_unlock(&ci->i_ceph_lock);
2980
2981        if (recon_state->msg_version >= 2) {
2982                int num_fcntl_locks, num_flock_locks;
2983                struct ceph_filelock *flocks = NULL;
2984                size_t struct_len, total_len = 0;
2985                u8 struct_v = 0;
2986
2987encode_again:
2988                if (rec.v2.flock_len) {
2989                        ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2990                } else {
2991                        num_fcntl_locks = 0;
2992                        num_flock_locks = 0;
2993                }
2994                if (num_fcntl_locks + num_flock_locks > 0) {
2995                        flocks = kmalloc((num_fcntl_locks + num_flock_locks) *
2996                                         sizeof(struct ceph_filelock), GFP_NOFS);
2997                        if (!flocks) {
2998                                err = -ENOMEM;
2999                                goto out_free;
3000                        }
3001                        err = ceph_encode_locks_to_buffer(inode, flocks,
3002                                                          num_fcntl_locks,
3003                                                          num_flock_locks);
3004                        if (err) {
3005                                kfree(flocks);
3006                                flocks = NULL;
3007                                if (err == -ENOSPC)
3008                                        goto encode_again;
3009                                goto out_free;
3010                        }
3011                } else {
3012                        kfree(flocks);
3013                        flocks = NULL;
3014                }
3015
3016                if (recon_state->msg_version >= 3) {
3017                        /* version, compat_version and struct_len */
3018                        total_len = 2 * sizeof(u8) + sizeof(u32);
3019                        struct_v = 2;
3020                }
3021                /*
3022                 * number of encoded locks is stable, so copy to pagelist
3023                 */
3024                struct_len = 2 * sizeof(u32) +
3025                            (num_fcntl_locks + num_flock_locks) *
3026                            sizeof(struct ceph_filelock);
3027                rec.v2.flock_len = cpu_to_le32(struct_len);
3028
3029                struct_len += sizeof(rec.v2);
3030                struct_len += sizeof(u32) + pathlen;
3031
3032                if (struct_v >= 2)
3033                        struct_len += sizeof(u64); /* snap_follows */
3034
3035                total_len += struct_len;
3036                err = ceph_pagelist_reserve(pagelist, total_len);
3037
3038                if (!err) {
3039                        if (recon_state->msg_version >= 3) {
3040                                ceph_pagelist_encode_8(pagelist, struct_v);
3041                                ceph_pagelist_encode_8(pagelist, 1);
3042                                ceph_pagelist_encode_32(pagelist, struct_len);
3043                        }
3044                        ceph_pagelist_encode_string(pagelist, path, pathlen);
3045                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3046                        ceph_locks_to_pagelist(flocks, pagelist,
3047                                               num_fcntl_locks,
3048                                               num_flock_locks);
3049                        if (struct_v >= 2)
3050                                ceph_pagelist_encode_64(pagelist, snap_follows);
3051                }
3052                kfree(flocks);
3053        } else {
3054                size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
3055                err = ceph_pagelist_reserve(pagelist, size);
3056                if (!err) {
3057                        ceph_pagelist_encode_string(pagelist, path, pathlen);
3058                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3059                }
3060        }
3061
3062        recon_state->nr_caps++;
3063out_free:
3064        kfree(path);
3065out_dput:
3066        dput(dentry);
3067        return err;
3068}
3069
3070
3071/*
3072 * If an MDS fails and recovers, clients need to reconnect in order to
3073 * reestablish shared state.  This includes all caps issued through
3074 * this session _and_ the snap_realm hierarchy.  Because it's not
3075 * clear which snap realms the mds cares about, we send everything we
3076 * know about.. that ensures we'll then get any new info the
3077 * recovering MDS might have.
3078 *
3079 * This is a relatively heavyweight operation, but it's rare.
3080 *
3081 * called with mdsc->mutex held.
3082 */
3083static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3084                               struct ceph_mds_session *session)
3085{
3086        struct ceph_msg *reply;
3087        struct rb_node *p;
3088        int mds = session->s_mds;
3089        int err = -ENOMEM;
3090        int s_nr_caps;
3091        struct ceph_pagelist *pagelist;
3092        struct ceph_reconnect_state recon_state;
3093        LIST_HEAD(dispose);
3094
3095        pr_info("mds%d reconnect start\n", mds);
3096
3097        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
3098        if (!pagelist)
3099                goto fail_nopagelist;
3100        ceph_pagelist_init(pagelist);
3101
3102        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
3103        if (!reply)
3104                goto fail_nomsg;
3105
3106        mutex_lock(&session->s_mutex);
3107        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3108        session->s_seq = 0;
3109
3110        dout("session %p state %s\n", session,
3111             ceph_session_state_name(session->s_state));
3112
3113        spin_lock(&session->s_gen_ttl_lock);
3114        session->s_cap_gen++;
3115        spin_unlock(&session->s_gen_ttl_lock);
3116
3117        spin_lock(&session->s_cap_lock);
3118        /* don't know if session is readonly */
3119        session->s_readonly = 0;
3120        /*
3121         * notify __ceph_remove_cap() that we are composing cap reconnect.
3122         * If a cap get released before being added to the cap reconnect,
3123         * __ceph_remove_cap() should skip queuing cap release.
3124         */
3125        session->s_cap_reconnect = 1;
3126        /* drop old cap expires; we're about to reestablish that state */
3127        detach_cap_releases(session, &dispose);
3128        spin_unlock(&session->s_cap_lock);
3129        dispose_cap_releases(mdsc, &dispose);
3130
3131        /* trim unused caps to reduce MDS's cache rejoin time */
3132        if (mdsc->fsc->sb->s_root)
3133                shrink_dcache_parent(mdsc->fsc->sb->s_root);
3134
3135        ceph_con_close(&session->s_con);
3136        ceph_con_open(&session->s_con,
3137                      CEPH_ENTITY_TYPE_MDS, mds,
3138                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3139
3140        /* replay unsafe requests */
3141        replay_unsafe_requests(mdsc, session);
3142
3143        down_read(&mdsc->snap_rwsem);
3144
3145        /* traverse this session's caps */
3146        s_nr_caps = session->s_nr_caps;
3147        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
3148        if (err)
3149                goto fail;
3150
3151        recon_state.nr_caps = 0;
3152        recon_state.pagelist = pagelist;
3153        if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
3154                recon_state.msg_version = 3;
3155        else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
3156                recon_state.msg_version = 2;
3157        else
3158                recon_state.msg_version = 1;
3159        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
3160        if (err < 0)
3161                goto fail;
3162
3163        spin_lock(&session->s_cap_lock);
3164        session->s_cap_reconnect = 0;
3165        spin_unlock(&session->s_cap_lock);
3166
3167        /*
3168         * snaprealms.  we provide mds with the ino, seq (version), and
3169         * parent for all of our realms.  If the mds has any newer info,
3170         * it will tell us.
3171         */
3172        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3173                struct ceph_snap_realm *realm =
3174                        rb_entry(p, struct ceph_snap_realm, node);
3175                struct ceph_mds_snaprealm_reconnect sr_rec;
3176
3177                dout(" adding snap realm %llx seq %lld parent %llx\n",
3178                     realm->ino, realm->seq, realm->parent_ino);
3179                sr_rec.ino = cpu_to_le64(realm->ino);
3180                sr_rec.seq = cpu_to_le64(realm->seq);
3181                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3182                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3183                if (err)
3184                        goto fail;
3185        }
3186
3187        reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3188
3189        /* raced with cap release? */
3190        if (s_nr_caps != recon_state.nr_caps) {
3191                struct page *page = list_first_entry(&pagelist->head,
3192                                                     struct page, lru);
3193                __le32 *addr = kmap_atomic(page);
3194                *addr = cpu_to_le32(recon_state.nr_caps);
3195                kunmap_atomic(addr);
3196        }
3197
3198        reply->hdr.data_len = cpu_to_le32(pagelist->length);
3199        ceph_msg_data_add_pagelist(reply, pagelist);
3200
3201        ceph_early_kick_flushing_caps(mdsc, session);
3202
3203        ceph_con_send(&session->s_con, reply);
3204
3205        mutex_unlock(&session->s_mutex);
3206
3207        mutex_lock(&mdsc->mutex);
3208        __wake_requests(mdsc, &session->s_waiting);
3209        mutex_unlock(&mdsc->mutex);
3210
3211        up_read(&mdsc->snap_rwsem);
3212        return;
3213
3214fail:
3215        ceph_msg_put(reply);
3216        up_read(&mdsc->snap_rwsem);
3217        mutex_unlock(&session->s_mutex);
3218fail_nomsg:
3219        ceph_pagelist_release(pagelist);
3220fail_nopagelist:
3221        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3222        return;
3223}
3224
3225
3226/*
3227 * compare old and new mdsmaps, kicking requests
3228 * and closing out old connections as necessary
3229 *
3230 * called under mdsc->mutex.
3231 */
3232static void check_new_map(struct ceph_mds_client *mdsc,
3233                          struct ceph_mdsmap *newmap,
3234                          struct ceph_mdsmap *oldmap)
3235{
3236        int i;
3237        int oldstate, newstate;
3238        struct ceph_mds_session *s;
3239
3240        dout("check_new_map new %u old %u\n",
3241             newmap->m_epoch, oldmap->m_epoch);
3242
3243        for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3244                if (!mdsc->sessions[i])
3245                        continue;
3246                s = mdsc->sessions[i];
3247                oldstate = ceph_mdsmap_get_state(oldmap, i);
3248                newstate = ceph_mdsmap_get_state(newmap, i);
3249
3250                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3251                     i, ceph_mds_state_name(oldstate),
3252                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3253                     ceph_mds_state_name(newstate),
3254                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3255                     ceph_session_state_name(s->s_state));
3256
3257                if (i >= newmap->m_num_mds ||
3258                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3259                           ceph_mdsmap_get_addr(newmap, i),
3260                           sizeof(struct ceph_entity_addr))) {
3261                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3262                                /* the session never opened, just close it
3263                                 * out now */
3264                                get_session(s);
3265                                __unregister_session(mdsc, s);
3266                                __wake_requests(mdsc, &s->s_waiting);
3267                                ceph_put_mds_session(s);
3268                        } else if (i >= newmap->m_num_mds) {
3269                                /* force close session for stopped mds */
3270                                get_session(s);
3271                                __unregister_session(mdsc, s);
3272                                __wake_requests(mdsc, &s->s_waiting);
3273                                kick_requests(mdsc, i);
3274                                mutex_unlock(&mdsc->mutex);
3275
3276                                mutex_lock(&s->s_mutex);
3277                                cleanup_session_requests(mdsc, s);
3278                                remove_session_caps(s);
3279                                mutex_unlock(&s->s_mutex);
3280
3281                                ceph_put_mds_session(s);
3282
3283                                mutex_lock(&mdsc->mutex);
3284                        } else {
3285                                /* just close it */
3286                                mutex_unlock(&mdsc->mutex);
3287                                mutex_lock(&s->s_mutex);
3288                                mutex_lock(&mdsc->mutex);
3289                                ceph_con_close(&s->s_con);
3290                                mutex_unlock(&s->s_mutex);
3291                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3292                        }
3293                } else if (oldstate == newstate) {
3294                        continue;  /* nothing new with this mds */
3295                }
3296
3297                /*
3298                 * send reconnect?
3299                 */
3300                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3301                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3302                        mutex_unlock(&mdsc->mutex);
3303                        send_mds_reconnect(mdsc, s);
3304                        mutex_lock(&mdsc->mutex);
3305                }
3306
3307                /*
3308                 * kick request on any mds that has gone active.
3309                 */
3310                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3311                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3312                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3313                            oldstate != CEPH_MDS_STATE_STARTING)
3314                                pr_info("mds%d recovery completed\n", s->s_mds);
3315                        kick_requests(mdsc, i);
3316                        ceph_kick_flushing_caps(mdsc, s);
3317                        wake_up_session_caps(s, 1);
3318                }
3319        }
3320
3321        for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3322                s = mdsc->sessions[i];
3323                if (!s)
3324                        continue;
3325                if (!ceph_mdsmap_is_laggy(newmap, i))
3326                        continue;
3327                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3328                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3329                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3330                        dout(" connecting to export targets of laggy mds%d\n",
3331                             i);
3332                        __open_export_target_sessions(mdsc, s);
3333                }
3334        }
3335}
3336
3337
3338
3339/*
3340 * leases
3341 */
3342
3343/*
3344 * caller must hold session s_mutex, dentry->d_lock
3345 */
3346void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3347{
3348        struct ceph_dentry_info *di = ceph_dentry(dentry);
3349
3350        ceph_put_mds_session(di->lease_session);
3351        di->lease_session = NULL;
3352}
3353
3354static void handle_lease(struct ceph_mds_client *mdsc,
3355                         struct ceph_mds_session *session,
3356                         struct ceph_msg *msg)
3357{
3358        struct super_block *sb = mdsc->fsc->sb;
3359        struct inode *inode;
3360        struct dentry *parent, *dentry;
3361        struct ceph_dentry_info *di;
3362        int mds = session->s_mds;
3363        struct ceph_mds_lease *h = msg->front.iov_base;
3364        u32 seq;
3365        struct ceph_vino vino;
3366        struct qstr dname;
3367        int release = 0;
3368
3369        dout("handle_lease from mds%d\n", mds);
3370
3371        /* decode */
3372        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3373                goto bad;
3374        vino.ino = le64_to_cpu(h->ino);
3375        vino.snap = CEPH_NOSNAP;
3376        seq = le32_to_cpu(h->seq);
3377        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3378        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3379        if (dname.len != get_unaligned_le32(h+1))
3380                goto bad;
3381
3382        /* lookup inode */
3383        inode = ceph_find_inode(sb, vino);
3384        dout("handle_lease %s, ino %llx %p %.*s\n",
3385             ceph_lease_op_name(h->action), vino.ino, inode,
3386             dname.len, dname.name);
3387
3388        mutex_lock(&session->s_mutex);
3389        session->s_seq++;
3390
3391        if (!inode) {
3392                dout("handle_lease no inode %llx\n", vino.ino);
3393                goto release;
3394        }
3395
3396        /* dentry */
3397        parent = d_find_alias(inode);
3398        if (!parent) {
3399                dout("no parent dentry on inode %p\n", inode);
3400                WARN_ON(1);
3401                goto release;  /* hrm... */
3402        }
3403        dname.hash = full_name_hash(parent, dname.name, dname.len);
3404        dentry = d_lookup(parent, &dname);
3405        dput(parent);
3406        if (!dentry)
3407                goto release;
3408
3409        spin_lock(&dentry->d_lock);
3410        di = ceph_dentry(dentry);
3411        switch (h->action) {
3412        case CEPH_MDS_LEASE_REVOKE:
3413                if (di->lease_session == session) {
3414                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3415                                h->seq = cpu_to_le32(di->lease_seq);
3416                        __ceph_mdsc_drop_dentry_lease(dentry);
3417                }
3418                release = 1;
3419                break;
3420
3421        case CEPH_MDS_LEASE_RENEW:
3422                if (di->lease_session == session &&
3423                    di->lease_gen == session->s_cap_gen &&
3424                    di->lease_renew_from &&
3425                    di->lease_renew_after == 0) {
3426                        unsigned long duration =
3427                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3428
3429                        di->lease_seq = seq;
3430                        di->time = di->lease_renew_from + duration;
3431                        di->lease_renew_after = di->lease_renew_from +
3432                                (duration >> 1);
3433                        di->lease_renew_from = 0;
3434                }
3435                break;
3436        }
3437        spin_unlock(&dentry->d_lock);
3438        dput(dentry);
3439
3440        if (!release)
3441                goto out;
3442
3443release:
3444        /* let's just reuse the same message */
3445        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3446        ceph_msg_get(msg);
3447        ceph_con_send(&session->s_con, msg);
3448
3449out:
3450        iput(inode);
3451        mutex_unlock(&session->s_mutex);
3452        return;
3453
3454bad:
3455        pr_err("corrupt lease message\n");
3456        ceph_msg_dump(msg);
3457}
3458
3459void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3460                              struct inode *inode,
3461                              struct dentry *dentry, char action,
3462                              u32 seq)
3463{
3464        struct ceph_msg *msg;
3465        struct ceph_mds_lease *lease;
3466        int len = sizeof(*lease) + sizeof(u32);
3467        int dnamelen = 0;
3468
3469        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3470             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3471        dnamelen = dentry->d_name.len;
3472        len += dnamelen;
3473
3474        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3475        if (!msg)
3476                return;
3477        lease = msg->front.iov_base;
3478        lease->action = action;
3479        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3480        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3481        lease->seq = cpu_to_le32(seq);
3482        put_unaligned_le32(dnamelen, lease + 1);
3483        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3484
3485        /*
3486         * if this is a preemptive lease RELEASE, no need to
3487         * flush request stream, since the actual request will
3488         * soon follow.
3489         */
3490        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3491
3492        ceph_con_send(&session->s_con, msg);
3493}
3494
3495/*
3496 * lock unlock sessions, to wait ongoing session activities
3497 */
3498static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
3499{
3500        int i;
3501
3502        mutex_lock(&mdsc->mutex);
3503        for (i = 0; i < mdsc->max_sessions; i++) {
3504                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3505                if (!s)
3506                        continue;
3507                mutex_unlock(&mdsc->mutex);
3508                mutex_lock(&s->s_mutex);
3509                mutex_unlock(&s->s_mutex);
3510                ceph_put_mds_session(s);
3511                mutex_lock(&mdsc->mutex);
3512        }
3513        mutex_unlock(&mdsc->mutex);
3514}
3515
3516
3517
3518/*
3519 * delayed work -- periodically trim expired leases, renew caps with mds
3520 */
3521static void schedule_delayed(struct ceph_mds_client *mdsc)
3522{
3523        int delay = 5;
3524        unsigned hz = round_jiffies_relative(HZ * delay);
3525        schedule_delayed_work(&mdsc->delayed_work, hz);
3526}
3527
3528static void delayed_work(struct work_struct *work)
3529{
3530        int i;
3531        struct ceph_mds_client *mdsc =
3532                container_of(work, struct ceph_mds_client, delayed_work.work);
3533        int renew_interval;
3534        int renew_caps;
3535
3536        dout("mdsc delayed_work\n");
3537        ceph_check_delayed_caps(mdsc);
3538
3539        mutex_lock(&mdsc->mutex);
3540        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3541        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3542                                   mdsc->last_renew_caps);
3543        if (renew_caps)
3544                mdsc->last_renew_caps = jiffies;
3545
3546        for (i = 0; i < mdsc->max_sessions; i++) {
3547                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3548                if (!s)
3549                        continue;
3550                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3551                        dout("resending session close request for mds%d\n",
3552                             s->s_mds);
3553                        request_close_session(mdsc, s);
3554                        ceph_put_mds_session(s);
3555                        continue;
3556                }
3557                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3558                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3559                                s->s_state = CEPH_MDS_SESSION_HUNG;
3560                                pr_info("mds%d hung\n", s->s_mds);
3561                        }
3562                }
3563                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3564                        /* this mds is failed or recovering, just wait */
3565                        ceph_put_mds_session(s);
3566                        continue;
3567                }
3568                mutex_unlock(&mdsc->mutex);
3569
3570                mutex_lock(&s->s_mutex);
3571                if (renew_caps)
3572                        send_renew_caps(mdsc, s);
3573                else
3574                        ceph_con_keepalive(&s->s_con);
3575                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3576                    s->s_state == CEPH_MDS_SESSION_HUNG)
3577                        ceph_send_cap_releases(mdsc, s);
3578                mutex_unlock(&s->s_mutex);
3579                ceph_put_mds_session(s);
3580
3581                mutex_lock(&mdsc->mutex);
3582        }
3583        mutex_unlock(&mdsc->mutex);
3584
3585        schedule_delayed(mdsc);
3586}
3587
3588int ceph_mdsc_init(struct ceph_fs_client *fsc)
3589
3590{
3591        struct ceph_mds_client *mdsc;
3592
3593        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3594        if (!mdsc)
3595                return -ENOMEM;
3596        mdsc->fsc = fsc;
3597        mutex_init(&mdsc->mutex);
3598        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3599        if (!mdsc->mdsmap) {
3600                kfree(mdsc);
3601                return -ENOMEM;
3602        }
3603
3604        fsc->mdsc = mdsc;
3605        init_completion(&mdsc->safe_umount_waiters);
3606        init_waitqueue_head(&mdsc->session_close_wq);
3607        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3608        mdsc->sessions = NULL;
3609        atomic_set(&mdsc->num_sessions, 0);
3610        mdsc->max_sessions = 0;
3611        mdsc->stopping = 0;
3612        atomic64_set(&mdsc->quotarealms_count, 0);
3613        mdsc->last_snap_seq = 0;
3614        init_rwsem(&mdsc->snap_rwsem);
3615        mdsc->snap_realms = RB_ROOT;
3616        INIT_LIST_HEAD(&mdsc->snap_empty);
3617        spin_lock_init(&mdsc->snap_empty_lock);
3618        mdsc->last_tid = 0;
3619        mdsc->oldest_tid = 0;
3620        mdsc->request_tree = RB_ROOT;
3621        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3622        mdsc->last_renew_caps = jiffies;
3623        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3624        spin_lock_init(&mdsc->cap_delay_lock);
3625        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3626        spin_lock_init(&mdsc->snap_flush_lock);
3627        mdsc->last_cap_flush_tid = 1;
3628        INIT_LIST_HEAD(&mdsc->cap_flush_list);
3629        INIT_LIST_HEAD(&mdsc->cap_dirty);
3630        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3631        mdsc->num_cap_flushing = 0;
3632        spin_lock_init(&mdsc->cap_dirty_lock);
3633        init_waitqueue_head(&mdsc->cap_flushing_wq);
3634        spin_lock_init(&mdsc->dentry_lru_lock);
3635        INIT_LIST_HEAD(&mdsc->dentry_lru);
3636
3637        ceph_caps_init(mdsc);
3638        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3639
3640        init_rwsem(&mdsc->pool_perm_rwsem);
3641        mdsc->pool_perm_tree = RB_ROOT;
3642
3643        strncpy(mdsc->nodename, utsname()->nodename,
3644                sizeof(mdsc->nodename) - 1);
3645        return 0;
3646}
3647
3648/*
3649 * Wait for safe replies on open mds requests.  If we time out, drop
3650 * all requests from the tree to avoid dangling dentry refs.
3651 */
3652static void wait_requests(struct ceph_mds_client *mdsc)
3653{
3654        struct ceph_options *opts = mdsc->fsc->client->options;
3655        struct ceph_mds_request *req;
3656
3657        mutex_lock(&mdsc->mutex);
3658        if (__get_oldest_req(mdsc)) {
3659                mutex_unlock(&mdsc->mutex);
3660
3661                dout("wait_requests waiting for requests\n");
3662                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3663                                    ceph_timeout_jiffies(opts->mount_timeout));
3664
3665                /* tear down remaining requests */
3666                mutex_lock(&mdsc->mutex);
3667                while ((req = __get_oldest_req(mdsc))) {
3668                        dout("wait_requests timed out on tid %llu\n",
3669                             req->r_tid);
3670                        __unregister_request(mdsc, req);
3671                }
3672        }
3673        mutex_unlock(&mdsc->mutex);
3674        dout("wait_requests done\n");
3675}
3676
3677/*
3678 * called before mount is ro, and before dentries are torn down.
3679 * (hmm, does this still race with new lookups?)
3680 */
3681void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3682{
3683        dout("pre_umount\n");
3684        mdsc->stopping = 1;
3685
3686        lock_unlock_sessions(mdsc);
3687        ceph_flush_dirty_caps(mdsc);
3688        wait_requests(mdsc);
3689
3690        /*
3691         * wait for reply handlers to drop their request refs and
3692         * their inode/dcache refs
3693         */
3694        ceph_msgr_flush();
3695}
3696
3697/*
3698 * wait for all write mds requests to flush.
3699 */
3700static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3701{
3702        struct ceph_mds_request *req = NULL, *nextreq;
3703        struct rb_node *n;
3704
3705        mutex_lock(&mdsc->mutex);
3706        dout("wait_unsafe_requests want %lld\n", want_tid);
3707restart:
3708        req = __get_oldest_req(mdsc);
3709        while (req && req->r_tid <= want_tid) {
3710                /* find next request */
3711                n = rb_next(&req->r_node);
3712                if (n)
3713                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3714                else
3715                        nextreq = NULL;
3716                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3717                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3718                        /* write op */
3719                        ceph_mdsc_get_request(req);
3720                        if (nextreq)
3721                                ceph_mdsc_get_request(nextreq);
3722                        mutex_unlock(&mdsc->mutex);
3723                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3724                             req->r_tid, want_tid);
3725                        wait_for_completion(&req->r_safe_completion);
3726                        mutex_lock(&mdsc->mutex);
3727                        ceph_mdsc_put_request(req);
3728                        if (!nextreq)
3729                                break;  /* next dne before, so we're done! */
3730                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3731                                /* next request was removed from tree */
3732                                ceph_mdsc_put_request(nextreq);
3733                                goto restart;
3734                        }
3735                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3736                }
3737                req = nextreq;
3738        }
3739        mutex_unlock(&mdsc->mutex);
3740        dout("wait_unsafe_requests done\n");
3741}
3742
3743void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3744{
3745        u64 want_tid, want_flush;
3746
3747        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3748                return;
3749
3750        dout("sync\n");
3751        mutex_lock(&mdsc->mutex);
3752        want_tid = mdsc->last_tid;
3753        mutex_unlock(&mdsc->mutex);
3754
3755        ceph_flush_dirty_caps(mdsc);
3756        spin_lock(&mdsc->cap_dirty_lock);
3757        want_flush = mdsc->last_cap_flush_tid;
3758        if (!list_empty(&mdsc->cap_flush_list)) {
3759                struct ceph_cap_flush *cf =
3760                        list_last_entry(&mdsc->cap_flush_list,
3761                                        struct ceph_cap_flush, g_list);
3762                cf->wake = true;
3763        }
3764        spin_unlock(&mdsc->cap_dirty_lock);
3765
3766        dout("sync want tid %lld flush_seq %lld\n",
3767             want_tid, want_flush);
3768
3769        wait_unsafe_requests(mdsc, want_tid);
3770        wait_caps_flush(mdsc, want_flush);
3771}
3772
3773/*
3774 * true if all sessions are closed, or we force unmount
3775 */
3776static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3777{
3778        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3779                return true;
3780        return atomic_read(&mdsc->num_sessions) <= skipped;
3781}
3782
3783/*
3784 * called after sb is ro.
3785 */
3786void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3787{
3788        struct ceph_options *opts = mdsc->fsc->client->options;
3789        struct ceph_mds_session *session;
3790        int i;
3791        int skipped = 0;
3792
3793        dout("close_sessions\n");
3794
3795        /* close sessions */
3796        mutex_lock(&mdsc->mutex);
3797        for (i = 0; i < mdsc->max_sessions; i++) {
3798                session = __ceph_lookup_mds_session(mdsc, i);
3799                if (!session)
3800                        continue;
3801                mutex_unlock(&mdsc->mutex);
3802                mutex_lock(&session->s_mutex);
3803                if (__close_session(mdsc, session) <= 0)
3804                        skipped++;
3805                mutex_unlock(&session->s_mutex);
3806                ceph_put_mds_session(session);
3807                mutex_lock(&mdsc->mutex);
3808        }
3809        mutex_unlock(&mdsc->mutex);
3810
3811        dout("waiting for sessions to close\n");
3812        wait_event_timeout(mdsc->session_close_wq,
3813                           done_closing_sessions(mdsc, skipped),
3814                           ceph_timeout_jiffies(opts->mount_timeout));
3815
3816        /* tear down remaining sessions */
3817        mutex_lock(&mdsc->mutex);
3818        for (i = 0; i < mdsc->max_sessions; i++) {
3819                if (mdsc->sessions[i]) {
3820                        session = get_session(mdsc->sessions[i]);
3821                        __unregister_session(mdsc, session);
3822                        mutex_unlock(&mdsc->mutex);
3823                        mutex_lock(&session->s_mutex);
3824                        remove_session_caps(session);
3825                        mutex_unlock(&session->s_mutex);
3826                        ceph_put_mds_session(session);
3827                        mutex_lock(&mdsc->mutex);
3828                }
3829        }
3830        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3831        mutex_unlock(&mdsc->mutex);
3832
3833        ceph_cleanup_empty_realms(mdsc);
3834
3835        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3836
3837        dout("stopped\n");
3838}
3839
3840void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3841{
3842        struct ceph_mds_session *session;
3843        int mds;
3844
3845        dout("force umount\n");
3846
3847        mutex_lock(&mdsc->mutex);
3848        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3849                session = __ceph_lookup_mds_session(mdsc, mds);
3850                if (!session)
3851                        continue;
3852                mutex_unlock(&mdsc->mutex);
3853                mutex_lock(&session->s_mutex);
3854                __close_session(mdsc, session);
3855                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3856                        cleanup_session_requests(mdsc, session);
3857                        remove_session_caps(session);
3858                }
3859                mutex_unlock(&session->s_mutex);
3860                ceph_put_mds_session(session);
3861                mutex_lock(&mdsc->mutex);
3862                kick_requests(mdsc, mds);
3863        }
3864        __wake_requests(mdsc, &mdsc->waiting_for_map);
3865        mutex_unlock(&mdsc->mutex);
3866}
3867
3868static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3869{
3870        dout("stop\n");
3871        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3872        if (mdsc->mdsmap)
3873                ceph_mdsmap_destroy(mdsc->mdsmap);
3874        kfree(mdsc->sessions);
3875        ceph_caps_finalize(mdsc);
3876        ceph_pool_perm_destroy(mdsc);
3877}
3878
3879void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3880{
3881        struct ceph_mds_client *mdsc = fsc->mdsc;
3882        dout("mdsc_destroy %p\n", mdsc);
3883
3884        if (!mdsc)
3885                return;
3886
3887        /* flush out any connection work with references to us */
3888        ceph_msgr_flush();
3889
3890        ceph_mdsc_stop(mdsc);
3891
3892        fsc->mdsc = NULL;
3893        kfree(mdsc);
3894        dout("mdsc_destroy %p done\n", mdsc);
3895}
3896
3897void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3898{
3899        struct ceph_fs_client *fsc = mdsc->fsc;
3900        const char *mds_namespace = fsc->mount_options->mds_namespace;
3901        void *p = msg->front.iov_base;
3902        void *end = p + msg->front.iov_len;
3903        u32 epoch;
3904        u32 map_len;
3905        u32 num_fs;
3906        u32 mount_fscid = (u32)-1;
3907        u8 struct_v, struct_cv;
3908        int err = -EINVAL;
3909
3910        ceph_decode_need(&p, end, sizeof(u32), bad);
3911        epoch = ceph_decode_32(&p);
3912
3913        dout("handle_fsmap epoch %u\n", epoch);
3914
3915        ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3916        struct_v = ceph_decode_8(&p);
3917        struct_cv = ceph_decode_8(&p);
3918        map_len = ceph_decode_32(&p);
3919
3920        ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
3921        p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
3922
3923        num_fs = ceph_decode_32(&p);
3924        while (num_fs-- > 0) {
3925                void *info_p, *info_end;
3926                u32 info_len;
3927                u8 info_v, info_cv;
3928                u32 fscid, namelen;
3929
3930                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3931                info_v = ceph_decode_8(&p);
3932                info_cv = ceph_decode_8(&p);
3933                info_len = ceph_decode_32(&p);
3934                ceph_decode_need(&p, end, info_len, bad);
3935                info_p = p;
3936                info_end = p + info_len;
3937                p = info_end;
3938
3939                ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
3940                fscid = ceph_decode_32(&info_p);
3941                namelen = ceph_decode_32(&info_p);
3942                ceph_decode_need(&info_p, info_end, namelen, bad);
3943
3944                if (mds_namespace &&
3945                    strlen(mds_namespace) == namelen &&
3946                    !strncmp(mds_namespace, (char *)info_p, namelen)) {
3947                        mount_fscid = fscid;
3948                        break;
3949                }
3950        }
3951
3952        ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
3953        if (mount_fscid != (u32)-1) {
3954                fsc->client->monc.fs_cluster_id = mount_fscid;
3955                ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
3956                                   0, true);
3957                ceph_monc_renew_subs(&fsc->client->monc);
3958        } else {
3959                err = -ENOENT;
3960                goto err_out;
3961        }
3962        return;
3963
3964bad:
3965        pr_err("error decoding fsmap\n");
3966err_out:
3967        mutex_lock(&mdsc->mutex);
3968        mdsc->mdsmap_err = err;
3969        __wake_requests(mdsc, &mdsc->waiting_for_map);
3970        mutex_unlock(&mdsc->mutex);
3971}
3972
3973/*
3974 * handle mds map update.
3975 */
3976void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3977{
3978        u32 epoch;
3979        u32 maplen;
3980        void *p = msg->front.iov_base;
3981        void *end = p + msg->front.iov_len;
3982        struct ceph_mdsmap *newmap, *oldmap;
3983        struct ceph_fsid fsid;
3984        int err = -EINVAL;
3985
3986        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3987        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3988        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3989                return;
3990        epoch = ceph_decode_32(&p);
3991        maplen = ceph_decode_32(&p);
3992        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3993
3994        /* do we need it? */
3995        mutex_lock(&mdsc->mutex);
3996        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3997                dout("handle_map epoch %u <= our %u\n",
3998                     epoch, mdsc->mdsmap->m_epoch);
3999                mutex_unlock(&mdsc->mutex);
4000                return;
4001        }
4002
4003        newmap = ceph_mdsmap_decode(&p, end);
4004        if (IS_ERR(newmap)) {
4005                err = PTR_ERR(newmap);
4006                goto bad_unlock;
4007        }
4008
4009        /* swap into place */
4010        if (mdsc->mdsmap) {
4011                oldmap = mdsc->mdsmap;
4012                mdsc->mdsmap = newmap;
4013                check_new_map(mdsc, newmap, oldmap);
4014                ceph_mdsmap_destroy(oldmap);
4015        } else {
4016                mdsc->mdsmap = newmap;  /* first mds map */
4017        }
4018        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
4019
4020        __wake_requests(mdsc, &mdsc->waiting_for_map);
4021        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4022                          mdsc->mdsmap->m_epoch);
4023
4024        mutex_unlock(&mdsc->mutex);
4025        schedule_delayed(mdsc);
4026        return;
4027
4028bad_unlock:
4029        mutex_unlock(&mdsc->mutex);
4030bad:
4031        pr_err("error decoding mdsmap %d\n", err);
4032        return;
4033}
4034
4035static struct ceph_connection *con_get(struct ceph_connection *con)
4036{
4037        struct ceph_mds_session *s = con->private;
4038
4039        if (get_session(s)) {
4040                dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4041                return con;
4042        }
4043        dout("mdsc con_get %p FAIL\n", s);
4044        return NULL;
4045}
4046
4047static void con_put(struct ceph_connection *con)
4048{
4049        struct ceph_mds_session *s = con->private;
4050
4051        dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4052        ceph_put_mds_session(s);
4053}
4054
4055/*
4056 * if the client is unresponsive for long enough, the mds will kill
4057 * the session entirely.
4058 */
4059static void peer_reset(struct ceph_connection *con)
4060{
4061        struct ceph_mds_session *s = con->private;
4062        struct ceph_mds_client *mdsc = s->s_mdsc;
4063
4064        pr_warn("mds%d closed our session\n", s->s_mds);
4065        send_mds_reconnect(mdsc, s);
4066}
4067
4068static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4069{
4070        struct ceph_mds_session *s = con->private;
4071        struct ceph_mds_client *mdsc = s->s_mdsc;
4072        int type = le16_to_cpu(msg->hdr.type);
4073
4074        mutex_lock(&mdsc->mutex);
4075        if (__verify_registered_session(mdsc, s) < 0) {
4076                mutex_unlock(&mdsc->mutex);
4077                goto out;
4078        }
4079        mutex_unlock(&mdsc->mutex);
4080
4081        switch (type) {
4082        case CEPH_MSG_MDS_MAP:
4083                ceph_mdsc_handle_mdsmap(mdsc, msg);
4084                break;
4085        case CEPH_MSG_FS_MAP_USER:
4086                ceph_mdsc_handle_fsmap(mdsc, msg);
4087                break;
4088        case CEPH_MSG_CLIENT_SESSION:
4089                handle_session(s, msg);
4090                break;
4091        case CEPH_MSG_CLIENT_REPLY:
4092                handle_reply(s, msg);
4093                break;
4094        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4095                handle_forward(mdsc, s, msg);
4096                break;
4097        case CEPH_MSG_CLIENT_CAPS:
4098                ceph_handle_caps(s, msg);
4099                break;
4100        case CEPH_MSG_CLIENT_SNAP:
4101                ceph_handle_snap(mdsc, s, msg);
4102                break;
4103        case CEPH_MSG_CLIENT_LEASE:
4104                handle_lease(mdsc, s, msg);
4105                break;
4106        case CEPH_MSG_CLIENT_QUOTA:
4107                ceph_handle_quota(mdsc, s, msg);
4108                break;
4109
4110        default:
4111                pr_err("received unknown message type %d %s\n", type,
4112                       ceph_msg_type_name(type));
4113        }
4114out:
4115        ceph_msg_put(msg);
4116}
4117
4118/*
4119 * authentication
4120 */
4121
4122/*
4123 * Note: returned pointer is the address of a structure that's
4124 * managed separately.  Caller must *not* attempt to free it.
4125 */
4126static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4127                                        int *proto, int force_new)
4128{
4129        struct ceph_mds_session *s = con->private;
4130        struct ceph_mds_client *mdsc = s->s_mdsc;
4131        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4132        struct ceph_auth_handshake *auth = &s->s_auth;
4133
4134        if (force_new && auth->authorizer) {
4135                ceph_auth_destroy_authorizer(auth->authorizer);
4136                auth->authorizer = NULL;
4137        }
4138        if (!auth->authorizer) {
4139                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4140                                                      auth);
4141                if (ret)
4142                        return ERR_PTR(ret);
4143        } else {
4144                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4145                                                      auth);
4146                if (ret)
4147                        return ERR_PTR(ret);
4148        }
4149        *proto = ac->protocol;
4150
4151        return auth;
4152}
4153
4154
4155static int verify_authorizer_reply(struct ceph_connection *con)
4156{
4157        struct ceph_mds_session *s = con->private;
4158        struct ceph_mds_client *mdsc = s->s_mdsc;
4159        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4160
4161        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4162}
4163
4164static int invalidate_authorizer(struct ceph_connection *con)
4165{
4166        struct ceph_mds_session *s = con->private;
4167        struct ceph_mds_client *mdsc = s->s_mdsc;
4168        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4169
4170        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4171
4172        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4173}
4174
4175static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4176                                struct ceph_msg_header *hdr, int *skip)
4177{
4178        struct ceph_msg *msg;
4179        int type = (int) le16_to_cpu(hdr->type);
4180        int front_len = (int) le32_to_cpu(hdr->front_len);
4181
4182        if (con->in_msg)
4183                return con->in_msg;
4184
4185        *skip = 0;
4186        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4187        if (!msg) {
4188                pr_err("unable to allocate msg type %d len %d\n",
4189                       type, front_len);
4190                return NULL;
4191        }
4192
4193        return msg;
4194}
4195
4196static int mds_sign_message(struct ceph_msg *msg)
4197{
4198       struct ceph_mds_session *s = msg->con->private;
4199       struct ceph_auth_handshake *auth = &s->s_auth;
4200
4201       return ceph_auth_sign_message(auth, msg);
4202}
4203
4204static int mds_check_message_signature(struct ceph_msg *msg)
4205{
4206       struct ceph_mds_session *s = msg->con->private;
4207       struct ceph_auth_handshake *auth = &s->s_auth;
4208
4209       return ceph_auth_check_message_signature(auth, msg);
4210}
4211
4212static const struct ceph_connection_operations mds_con_ops = {
4213        .get = con_get,
4214        .put = con_put,
4215        .dispatch = dispatch,
4216        .get_authorizer = get_authorizer,
4217        .verify_authorizer_reply = verify_authorizer_reply,
4218        .invalidate_authorizer = invalidate_authorizer,
4219        .peer_reset = peer_reset,
4220        .alloc_msg = mds_alloc_msg,
4221        .sign_message = mds_sign_message,
4222        .check_message_signature = mds_check_message_signature,
4223};
4224
4225/* eof */
4226