linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/fs.h>
   4#include <linux/wait.h>
   5#include <linux/slab.h>
   6#include <linux/gfp.h>
   7#include <linux/sched.h>
   8#include <linux/debugfs.h>
   9#include <linux/seq_file.h>
  10#include <linux/utsname.h>
  11#include <linux/ratelimit.h>
  12
  13#include "super.h"
  14#include "mds_client.h"
  15
  16#include <linux/ceph/ceph_features.h>
  17#include <linux/ceph/messenger.h>
  18#include <linux/ceph/decode.h>
  19#include <linux/ceph/pagelist.h>
  20#include <linux/ceph/auth.h>
  21#include <linux/ceph/debugfs.h>
  22
  23/*
  24 * A cluster of MDS (metadata server) daemons is responsible for
  25 * managing the file system namespace (the directory hierarchy and
  26 * inodes) and for coordinating shared access to storage.  Metadata is
  27 * partitioning hierarchically across a number of servers, and that
  28 * partition varies over time as the cluster adjusts the distribution
  29 * in order to balance load.
  30 *
  31 * The MDS client is primarily responsible to managing synchronous
  32 * metadata requests for operations like open, unlink, and so forth.
  33 * If there is a MDS failure, we find out about it when we (possibly
  34 * request and) receive a new MDS map, and can resubmit affected
  35 * requests.
  36 *
  37 * For the most part, though, we take advantage of a lossless
  38 * communications channel to the MDS, and do not need to worry about
  39 * timing out or resubmitting requests.
  40 *
  41 * We maintain a stateful "session" with each MDS we interact with.
  42 * Within each session, we sent periodic heartbeat messages to ensure
  43 * any capabilities or leases we have been issues remain valid.  If
  44 * the session times out and goes stale, our leases and capabilities
  45 * are no longer valid.
  46 */
  47
  48struct ceph_reconnect_state {
  49        int nr_caps;
  50        struct ceph_pagelist *pagelist;
  51        bool flock;
  52};
  53
  54static void __wake_requests(struct ceph_mds_client *mdsc,
  55                            struct list_head *head);
  56
  57static const struct ceph_connection_operations mds_con_ops;
  58
  59
  60/*
  61 * mds reply parsing
  62 */
  63
  64/*
  65 * parse individual inode info
  66 */
  67static int parse_reply_info_in(void **p, void *end,
  68                               struct ceph_mds_reply_info_in *info,
  69                               u64 features)
  70{
  71        int err = -EIO;
  72
  73        info->in = *p;
  74        *p += sizeof(struct ceph_mds_reply_inode) +
  75                sizeof(*info->in->fragtree.splits) *
  76                le32_to_cpu(info->in->fragtree.nsplits);
  77
  78        ceph_decode_32_safe(p, end, info->symlink_len, bad);
  79        ceph_decode_need(p, end, info->symlink_len, bad);
  80        info->symlink = *p;
  81        *p += info->symlink_len;
  82
  83        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  84                ceph_decode_copy_safe(p, end, &info->dir_layout,
  85                                      sizeof(info->dir_layout), bad);
  86        else
  87                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  88
  89        ceph_decode_32_safe(p, end, info->xattr_len, bad);
  90        ceph_decode_need(p, end, info->xattr_len, bad);
  91        info->xattr_data = *p;
  92        *p += info->xattr_len;
  93
  94        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
  95                ceph_decode_64_safe(p, end, info->inline_version, bad);
  96                ceph_decode_32_safe(p, end, info->inline_len, bad);
  97                ceph_decode_need(p, end, info->inline_len, bad);
  98                info->inline_data = *p;
  99                *p += info->inline_len;
 100        } else
 101                info->inline_version = CEPH_INLINE_NONE;
 102
 103        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 104                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 105                ceph_decode_need(p, end, info->pool_ns_len, bad);
 106                *p += info->pool_ns_len;
 107        } else {
 108                info->pool_ns_len = 0;
 109        }
 110
 111        return 0;
 112bad:
 113        return err;
 114}
 115
 116/*
 117 * parse a normal reply, which may contain a (dir+)dentry and/or a
 118 * target inode.
 119 */
 120static int parse_reply_info_trace(void **p, void *end,
 121                                  struct ceph_mds_reply_info_parsed *info,
 122                                  u64 features)
 123{
 124        int err;
 125
 126        if (info->head->is_dentry) {
 127                err = parse_reply_info_in(p, end, &info->diri, features);
 128                if (err < 0)
 129                        goto out_bad;
 130
 131                if (unlikely(*p + sizeof(*info->dirfrag) > end))
 132                        goto bad;
 133                info->dirfrag = *p;
 134                *p += sizeof(*info->dirfrag) +
 135                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 136                if (unlikely(*p > end))
 137                        goto bad;
 138
 139                ceph_decode_32_safe(p, end, info->dname_len, bad);
 140                ceph_decode_need(p, end, info->dname_len, bad);
 141                info->dname = *p;
 142                *p += info->dname_len;
 143                info->dlease = *p;
 144                *p += sizeof(*info->dlease);
 145        }
 146
 147        if (info->head->is_target) {
 148                err = parse_reply_info_in(p, end, &info->targeti, features);
 149                if (err < 0)
 150                        goto out_bad;
 151        }
 152
 153        if (unlikely(*p != end))
 154                goto bad;
 155        return 0;
 156
 157bad:
 158        err = -EIO;
 159out_bad:
 160        pr_err("problem parsing mds trace %d\n", err);
 161        return err;
 162}
 163
 164/*
 165 * parse readdir results
 166 */
 167static int parse_reply_info_dir(void **p, void *end,
 168                                struct ceph_mds_reply_info_parsed *info,
 169                                u64 features)
 170{
 171        u32 num, i = 0;
 172        int err;
 173
 174        info->dir_dir = *p;
 175        if (*p + sizeof(*info->dir_dir) > end)
 176                goto bad;
 177        *p += sizeof(*info->dir_dir) +
 178                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 179        if (*p > end)
 180                goto bad;
 181
 182        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 183        num = ceph_decode_32(p);
 184        info->dir_end = ceph_decode_8(p);
 185        info->dir_complete = ceph_decode_8(p);
 186        if (num == 0)
 187                goto done;
 188
 189        BUG_ON(!info->dir_in);
 190        info->dir_dname = (void *)(info->dir_in + num);
 191        info->dir_dname_len = (void *)(info->dir_dname + num);
 192        info->dir_dlease = (void *)(info->dir_dname_len + num);
 193        if ((unsigned long)(info->dir_dlease + num) >
 194            (unsigned long)info->dir_in + info->dir_buf_size) {
 195                pr_err("dir contents are larger than expected\n");
 196                WARN_ON(1);
 197                goto bad;
 198        }
 199
 200        info->dir_nr = num;
 201        while (num) {
 202                /* dentry */
 203                ceph_decode_need(p, end, sizeof(u32)*2, bad);
 204                info->dir_dname_len[i] = ceph_decode_32(p);
 205                ceph_decode_need(p, end, info->dir_dname_len[i], bad);
 206                info->dir_dname[i] = *p;
 207                *p += info->dir_dname_len[i];
 208                dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
 209                     info->dir_dname[i]);
 210                info->dir_dlease[i] = *p;
 211                *p += sizeof(struct ceph_mds_reply_lease);
 212
 213                /* inode */
 214                err = parse_reply_info_in(p, end, &info->dir_in[i], features);
 215                if (err < 0)
 216                        goto out_bad;
 217                i++;
 218                num--;
 219        }
 220
 221done:
 222        if (*p != end)
 223                goto bad;
 224        return 0;
 225
 226bad:
 227        err = -EIO;
 228out_bad:
 229        pr_err("problem parsing dir contents %d\n", err);
 230        return err;
 231}
 232
 233/*
 234 * parse fcntl F_GETLK results
 235 */
 236static int parse_reply_info_filelock(void **p, void *end,
 237                                     struct ceph_mds_reply_info_parsed *info,
 238                                     u64 features)
 239{
 240        if (*p + sizeof(*info->filelock_reply) > end)
 241                goto bad;
 242
 243        info->filelock_reply = *p;
 244        *p += sizeof(*info->filelock_reply);
 245
 246        if (unlikely(*p != end))
 247                goto bad;
 248        return 0;
 249
 250bad:
 251        return -EIO;
 252}
 253
 254/*
 255 * parse create results
 256 */
 257static int parse_reply_info_create(void **p, void *end,
 258                                  struct ceph_mds_reply_info_parsed *info,
 259                                  u64 features)
 260{
 261        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
 262                if (*p == end) {
 263                        info->has_create_ino = false;
 264                } else {
 265                        info->has_create_ino = true;
 266                        info->ino = ceph_decode_64(p);
 267                }
 268        }
 269
 270        if (unlikely(*p != end))
 271                goto bad;
 272        return 0;
 273
 274bad:
 275        return -EIO;
 276}
 277
 278/*
 279 * parse extra results
 280 */
 281static int parse_reply_info_extra(void **p, void *end,
 282                                  struct ceph_mds_reply_info_parsed *info,
 283                                  u64 features)
 284{
 285        if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 286                return parse_reply_info_filelock(p, end, info, features);
 287        else if (info->head->op == CEPH_MDS_OP_READDIR ||
 288                 info->head->op == CEPH_MDS_OP_LSSNAP)
 289                return parse_reply_info_dir(p, end, info, features);
 290        else if (info->head->op == CEPH_MDS_OP_CREATE)
 291                return parse_reply_info_create(p, end, info, features);
 292        else
 293                return -EIO;
 294}
 295
 296/*
 297 * parse entire mds reply
 298 */
 299static int parse_reply_info(struct ceph_msg *msg,
 300                            struct ceph_mds_reply_info_parsed *info,
 301                            u64 features)
 302{
 303        void *p, *end;
 304        u32 len;
 305        int err;
 306
 307        info->head = msg->front.iov_base;
 308        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 309        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 310
 311        /* trace */
 312        ceph_decode_32_safe(&p, end, len, bad);
 313        if (len > 0) {
 314                ceph_decode_need(&p, end, len, bad);
 315                err = parse_reply_info_trace(&p, p+len, info, features);
 316                if (err < 0)
 317                        goto out_bad;
 318        }
 319
 320        /* extra */
 321        ceph_decode_32_safe(&p, end, len, bad);
 322        if (len > 0) {
 323                ceph_decode_need(&p, end, len, bad);
 324                err = parse_reply_info_extra(&p, p+len, info, features);
 325                if (err < 0)
 326                        goto out_bad;
 327        }
 328
 329        /* snap blob */
 330        ceph_decode_32_safe(&p, end, len, bad);
 331        info->snapblob_len = len;
 332        info->snapblob = p;
 333        p += len;
 334
 335        if (p != end)
 336                goto bad;
 337        return 0;
 338
 339bad:
 340        err = -EIO;
 341out_bad:
 342        pr_err("mds parse_reply err %d\n", err);
 343        return err;
 344}
 345
 346static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 347{
 348        if (!info->dir_in)
 349                return;
 350        free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
 351}
 352
 353
 354/*
 355 * sessions
 356 */
 357const char *ceph_session_state_name(int s)
 358{
 359        switch (s) {
 360        case CEPH_MDS_SESSION_NEW: return "new";
 361        case CEPH_MDS_SESSION_OPENING: return "opening";
 362        case CEPH_MDS_SESSION_OPEN: return "open";
 363        case CEPH_MDS_SESSION_HUNG: return "hung";
 364        case CEPH_MDS_SESSION_CLOSING: return "closing";
 365        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 366        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 367        default: return "???";
 368        }
 369}
 370
 371static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 372{
 373        if (atomic_inc_not_zero(&s->s_ref)) {
 374                dout("mdsc get_session %p %d -> %d\n", s,
 375                     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
 376                return s;
 377        } else {
 378                dout("mdsc get_session %p 0 -- FAIL", s);
 379                return NULL;
 380        }
 381}
 382
 383void ceph_put_mds_session(struct ceph_mds_session *s)
 384{
 385        dout("mdsc put_session %p %d -> %d\n", s,
 386             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 387        if (atomic_dec_and_test(&s->s_ref)) {
 388                if (s->s_auth.authorizer)
 389                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 390                kfree(s);
 391        }
 392}
 393
 394/*
 395 * called under mdsc->mutex
 396 */
 397struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 398                                                   int mds)
 399{
 400        struct ceph_mds_session *session;
 401
 402        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
 403                return NULL;
 404        session = mdsc->sessions[mds];
 405        dout("lookup_mds_session %p %d\n", session,
 406             atomic_read(&session->s_ref));
 407        get_session(session);
 408        return session;
 409}
 410
 411static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 412{
 413        if (mds >= mdsc->max_sessions)
 414                return false;
 415        return mdsc->sessions[mds];
 416}
 417
 418static int __verify_registered_session(struct ceph_mds_client *mdsc,
 419                                       struct ceph_mds_session *s)
 420{
 421        if (s->s_mds >= mdsc->max_sessions ||
 422            mdsc->sessions[s->s_mds] != s)
 423                return -ENOENT;
 424        return 0;
 425}
 426
 427/*
 428 * create+register a new session for given mds.
 429 * called under mdsc->mutex.
 430 */
 431static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 432                                                 int mds)
 433{
 434        struct ceph_mds_session *s;
 435
 436        if (mds >= mdsc->mdsmap->m_max_mds)
 437                return ERR_PTR(-EINVAL);
 438
 439        s = kzalloc(sizeof(*s), GFP_NOFS);
 440        if (!s)
 441                return ERR_PTR(-ENOMEM);
 442        s->s_mdsc = mdsc;
 443        s->s_mds = mds;
 444        s->s_state = CEPH_MDS_SESSION_NEW;
 445        s->s_ttl = 0;
 446        s->s_seq = 0;
 447        mutex_init(&s->s_mutex);
 448
 449        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 450
 451        spin_lock_init(&s->s_gen_ttl_lock);
 452        s->s_cap_gen = 0;
 453        s->s_cap_ttl = jiffies - 1;
 454
 455        spin_lock_init(&s->s_cap_lock);
 456        s->s_renew_requested = 0;
 457        s->s_renew_seq = 0;
 458        INIT_LIST_HEAD(&s->s_caps);
 459        s->s_nr_caps = 0;
 460        s->s_trim_caps = 0;
 461        atomic_set(&s->s_ref, 1);
 462        INIT_LIST_HEAD(&s->s_waiting);
 463        INIT_LIST_HEAD(&s->s_unsafe);
 464        s->s_num_cap_releases = 0;
 465        s->s_cap_reconnect = 0;
 466        s->s_cap_iterator = NULL;
 467        INIT_LIST_HEAD(&s->s_cap_releases);
 468        INIT_LIST_HEAD(&s->s_cap_flushing);
 469        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 470
 471        dout("register_session mds%d\n", mds);
 472        if (mds >= mdsc->max_sessions) {
 473                int newmax = 1 << get_count_order(mds+1);
 474                struct ceph_mds_session **sa;
 475
 476                dout("register_session realloc to %d\n", newmax);
 477                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 478                if (sa == NULL)
 479                        goto fail_realloc;
 480                if (mdsc->sessions) {
 481                        memcpy(sa, mdsc->sessions,
 482                               mdsc->max_sessions * sizeof(void *));
 483                        kfree(mdsc->sessions);
 484                }
 485                mdsc->sessions = sa;
 486                mdsc->max_sessions = newmax;
 487        }
 488        mdsc->sessions[mds] = s;
 489        atomic_inc(&mdsc->num_sessions);
 490        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 491
 492        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 493                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 494
 495        return s;
 496
 497fail_realloc:
 498        kfree(s);
 499        return ERR_PTR(-ENOMEM);
 500}
 501
 502/*
 503 * called under mdsc->mutex
 504 */
 505static void __unregister_session(struct ceph_mds_client *mdsc,
 506                               struct ceph_mds_session *s)
 507{
 508        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 509        BUG_ON(mdsc->sessions[s->s_mds] != s);
 510        mdsc->sessions[s->s_mds] = NULL;
 511        ceph_con_close(&s->s_con);
 512        ceph_put_mds_session(s);
 513        atomic_dec(&mdsc->num_sessions);
 514}
 515
 516/*
 517 * drop session refs in request.
 518 *
 519 * should be last request ref, or hold mdsc->mutex
 520 */
 521static void put_request_session(struct ceph_mds_request *req)
 522{
 523        if (req->r_session) {
 524                ceph_put_mds_session(req->r_session);
 525                req->r_session = NULL;
 526        }
 527}
 528
 529void ceph_mdsc_release_request(struct kref *kref)
 530{
 531        struct ceph_mds_request *req = container_of(kref,
 532                                                    struct ceph_mds_request,
 533                                                    r_kref);
 534        destroy_reply_info(&req->r_reply_info);
 535        if (req->r_request)
 536                ceph_msg_put(req->r_request);
 537        if (req->r_reply)
 538                ceph_msg_put(req->r_reply);
 539        if (req->r_inode) {
 540                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 541                iput(req->r_inode);
 542        }
 543        if (req->r_locked_dir)
 544                ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 545        iput(req->r_target_inode);
 546        if (req->r_dentry)
 547                dput(req->r_dentry);
 548        if (req->r_old_dentry)
 549                dput(req->r_old_dentry);
 550        if (req->r_old_dentry_dir) {
 551                /*
 552                 * track (and drop pins for) r_old_dentry_dir
 553                 * separately, since r_old_dentry's d_parent may have
 554                 * changed between the dir mutex being dropped and
 555                 * this request being freed.
 556                 */
 557                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 558                                  CEPH_CAP_PIN);
 559                iput(req->r_old_dentry_dir);
 560        }
 561        kfree(req->r_path1);
 562        kfree(req->r_path2);
 563        if (req->r_pagelist)
 564                ceph_pagelist_release(req->r_pagelist);
 565        put_request_session(req);
 566        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 567        kfree(req);
 568}
 569
 570DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
 571
 572/*
 573 * lookup session, bump ref if found.
 574 *
 575 * called under mdsc->mutex.
 576 */
 577static struct ceph_mds_request *
 578lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
 579{
 580        struct ceph_mds_request *req;
 581
 582        req = lookup_request(&mdsc->request_tree, tid);
 583        if (req)
 584                ceph_mdsc_get_request(req);
 585
 586        return req;
 587}
 588
 589/*
 590 * Register an in-flight request, and assign a tid.  Link to directory
 591 * are modifying (if any).
 592 *
 593 * Called under mdsc->mutex.
 594 */
 595static void __register_request(struct ceph_mds_client *mdsc,
 596                               struct ceph_mds_request *req,
 597                               struct inode *dir)
 598{
 599        req->r_tid = ++mdsc->last_tid;
 600        if (req->r_num_caps)
 601                ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 602                                  req->r_num_caps);
 603        dout("__register_request %p tid %lld\n", req, req->r_tid);
 604        ceph_mdsc_get_request(req);
 605        insert_request(&mdsc->request_tree, req);
 606
 607        req->r_uid = current_fsuid();
 608        req->r_gid = current_fsgid();
 609
 610        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 611                mdsc->oldest_tid = req->r_tid;
 612
 613        if (dir) {
 614                ihold(dir);
 615                req->r_unsafe_dir = dir;
 616        }
 617}
 618
 619static void __unregister_request(struct ceph_mds_client *mdsc,
 620                                 struct ceph_mds_request *req)
 621{
 622        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 623
 624        if (req->r_tid == mdsc->oldest_tid) {
 625                struct rb_node *p = rb_next(&req->r_node);
 626                mdsc->oldest_tid = 0;
 627                while (p) {
 628                        struct ceph_mds_request *next_req =
 629                                rb_entry(p, struct ceph_mds_request, r_node);
 630                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 631                                mdsc->oldest_tid = next_req->r_tid;
 632                                break;
 633                        }
 634                        p = rb_next(p);
 635                }
 636        }
 637
 638        erase_request(&mdsc->request_tree, req);
 639
 640        if (req->r_unsafe_dir && req->r_got_unsafe) {
 641                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 642                spin_lock(&ci->i_unsafe_lock);
 643                list_del_init(&req->r_unsafe_dir_item);
 644                spin_unlock(&ci->i_unsafe_lock);
 645        }
 646        if (req->r_target_inode && req->r_got_unsafe) {
 647                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 648                spin_lock(&ci->i_unsafe_lock);
 649                list_del_init(&req->r_unsafe_target_item);
 650                spin_unlock(&ci->i_unsafe_lock);
 651        }
 652
 653        if (req->r_unsafe_dir) {
 654                iput(req->r_unsafe_dir);
 655                req->r_unsafe_dir = NULL;
 656        }
 657
 658        complete_all(&req->r_safe_completion);
 659
 660        ceph_mdsc_put_request(req);
 661}
 662
 663/*
 664 * Choose mds to send request to next.  If there is a hint set in the
 665 * request (e.g., due to a prior forward hint from the mds), use that.
 666 * Otherwise, consult frag tree and/or caps to identify the
 667 * appropriate mds.  If all else fails, choose randomly.
 668 *
 669 * Called under mdsc->mutex.
 670 */
 671static struct dentry *get_nonsnap_parent(struct dentry *dentry)
 672{
 673        /*
 674         * we don't need to worry about protecting the d_parent access
 675         * here because we never renaming inside the snapped namespace
 676         * except to resplice to another snapdir, and either the old or new
 677         * result is a valid result.
 678         */
 679        while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
 680                dentry = dentry->d_parent;
 681        return dentry;
 682}
 683
 684static int __choose_mds(struct ceph_mds_client *mdsc,
 685                        struct ceph_mds_request *req)
 686{
 687        struct inode *inode;
 688        struct ceph_inode_info *ci;
 689        struct ceph_cap *cap;
 690        int mode = req->r_direct_mode;
 691        int mds = -1;
 692        u32 hash = req->r_direct_hash;
 693        bool is_hash = req->r_direct_is_hash;
 694
 695        /*
 696         * is there a specific mds we should try?  ignore hint if we have
 697         * no session and the mds is not up (active or recovering).
 698         */
 699        if (req->r_resend_mds >= 0 &&
 700            (__have_session(mdsc, req->r_resend_mds) ||
 701             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 702                dout("choose_mds using resend_mds mds%d\n",
 703                     req->r_resend_mds);
 704                return req->r_resend_mds;
 705        }
 706
 707        if (mode == USE_RANDOM_MDS)
 708                goto random;
 709
 710        inode = NULL;
 711        if (req->r_inode) {
 712                inode = req->r_inode;
 713        } else if (req->r_dentry) {
 714                /* ignore race with rename; old or new d_parent is okay */
 715                struct dentry *parent = req->r_dentry->d_parent;
 716                struct inode *dir = parent->d_inode;
 717
 718                if (dir->i_sb != mdsc->fsc->sb) {
 719                        /* not this fs! */
 720                        inode = req->r_dentry->d_inode;
 721                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
 722                        /* direct snapped/virtual snapdir requests
 723                         * based on parent dir inode */
 724                        struct dentry *dn = get_nonsnap_parent(parent);
 725                        inode = dn->d_inode;
 726                        dout("__choose_mds using nonsnap parent %p\n", inode);
 727                } else {
 728                        /* dentry target */
 729                        inode = req->r_dentry->d_inode;
 730                        if (!inode || mode == USE_AUTH_MDS) {
 731                                /* dir + name */
 732                                inode = dir;
 733                                hash = ceph_dentry_hash(dir, req->r_dentry);
 734                                is_hash = true;
 735                        }
 736                }
 737        }
 738
 739        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 740             (int)hash, mode);
 741        if (!inode)
 742                goto random;
 743        ci = ceph_inode(inode);
 744
 745        if (is_hash && S_ISDIR(inode->i_mode)) {
 746                struct ceph_inode_frag frag;
 747                int found;
 748
 749                ceph_choose_frag(ci, hash, &frag, &found);
 750                if (found) {
 751                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
 752                                u8 r;
 753
 754                                /* choose a random replica */
 755                                get_random_bytes(&r, 1);
 756                                r %= frag.ndist;
 757                                mds = frag.dist[r];
 758                                dout("choose_mds %p %llx.%llx "
 759                                     "frag %u mds%d (%d/%d)\n",
 760                                     inode, ceph_vinop(inode),
 761                                     frag.frag, mds,
 762                                     (int)r, frag.ndist);
 763                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 764                                    CEPH_MDS_STATE_ACTIVE)
 765                                        return mds;
 766                        }
 767
 768                        /* since this file/dir wasn't known to be
 769                         * replicated, then we want to look for the
 770                         * authoritative mds. */
 771                        mode = USE_AUTH_MDS;
 772                        if (frag.mds >= 0) {
 773                                /* choose auth mds */
 774                                mds = frag.mds;
 775                                dout("choose_mds %p %llx.%llx "
 776                                     "frag %u mds%d (auth)\n",
 777                                     inode, ceph_vinop(inode), frag.frag, mds);
 778                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 779                                    CEPH_MDS_STATE_ACTIVE)
 780                                        return mds;
 781                        }
 782                }
 783        }
 784
 785        spin_lock(&ci->i_ceph_lock);
 786        cap = NULL;
 787        if (mode == USE_AUTH_MDS)
 788                cap = ci->i_auth_cap;
 789        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 790                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 791        if (!cap) {
 792                spin_unlock(&ci->i_ceph_lock);
 793                goto random;
 794        }
 795        mds = cap->session->s_mds;
 796        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 797             inode, ceph_vinop(inode), mds,
 798             cap == ci->i_auth_cap ? "auth " : "", cap);
 799        spin_unlock(&ci->i_ceph_lock);
 800        return mds;
 801
 802random:
 803        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 804        dout("choose_mds chose random mds%d\n", mds);
 805        return mds;
 806}
 807
 808
 809/*
 810 * session messages
 811 */
 812static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 813{
 814        struct ceph_msg *msg;
 815        struct ceph_mds_session_head *h;
 816
 817        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 818                           false);
 819        if (!msg) {
 820                pr_err("create_session_msg ENOMEM creating msg\n");
 821                return NULL;
 822        }
 823        h = msg->front.iov_base;
 824        h->op = cpu_to_le32(op);
 825        h->seq = cpu_to_le64(seq);
 826
 827        return msg;
 828}
 829
 830/*
 831 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
 832 * to include additional client metadata fields.
 833 */
 834static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
 835{
 836        struct ceph_msg *msg;
 837        struct ceph_mds_session_head *h;
 838        int i = -1;
 839        int metadata_bytes = 0;
 840        int metadata_key_count = 0;
 841        struct ceph_options *opt = mdsc->fsc->client->options;
 842        void *p;
 843
 844        const char* metadata[][2] = {
 845                {"hostname", utsname()->nodename},
 846                {"kernel_version", utsname()->release},
 847                {"entity_id", opt->name ? opt->name : ""},
 848                {NULL, NULL}
 849        };
 850
 851        /* Calculate serialized length of metadata */
 852        metadata_bytes = 4;  /* map length */
 853        for (i = 0; metadata[i][0] != NULL; ++i) {
 854                metadata_bytes += 8 + strlen(metadata[i][0]) +
 855                        strlen(metadata[i][1]);
 856                metadata_key_count++;
 857        }
 858
 859        /* Allocate the message */
 860        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
 861                           GFP_NOFS, false);
 862        if (!msg) {
 863                pr_err("create_session_msg ENOMEM creating msg\n");
 864                return NULL;
 865        }
 866        h = msg->front.iov_base;
 867        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
 868        h->seq = cpu_to_le64(seq);
 869
 870        /*
 871         * Serialize client metadata into waiting buffer space, using
 872         * the format that userspace expects for map<string, string>
 873         *
 874         * ClientSession messages with metadata are v2
 875         */
 876        msg->hdr.version = cpu_to_le16(2);
 877        msg->hdr.compat_version = cpu_to_le16(1);
 878
 879        /* The write pointer, following the session_head structure */
 880        p = msg->front.iov_base + sizeof(*h);
 881
 882        /* Number of entries in the map */
 883        ceph_encode_32(&p, metadata_key_count);
 884
 885        /* Two length-prefixed strings for each entry in the map */
 886        for (i = 0; metadata[i][0] != NULL; ++i) {
 887                size_t const key_len = strlen(metadata[i][0]);
 888                size_t const val_len = strlen(metadata[i][1]);
 889
 890                ceph_encode_32(&p, key_len);
 891                memcpy(p, metadata[i][0], key_len);
 892                p += key_len;
 893                ceph_encode_32(&p, val_len);
 894                memcpy(p, metadata[i][1], val_len);
 895                p += val_len;
 896        }
 897
 898        return msg;
 899}
 900
 901/*
 902 * send session open request.
 903 *
 904 * called under mdsc->mutex
 905 */
 906static int __open_session(struct ceph_mds_client *mdsc,
 907                          struct ceph_mds_session *session)
 908{
 909        struct ceph_msg *msg;
 910        int mstate;
 911        int mds = session->s_mds;
 912
 913        /* wait for mds to go active? */
 914        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 915        dout("open_session to mds%d (%s)\n", mds,
 916             ceph_mds_state_name(mstate));
 917        session->s_state = CEPH_MDS_SESSION_OPENING;
 918        session->s_renew_requested = jiffies;
 919
 920        /* send connect message */
 921        msg = create_session_open_msg(mdsc, session->s_seq);
 922        if (!msg)
 923                return -ENOMEM;
 924        ceph_con_send(&session->s_con, msg);
 925        return 0;
 926}
 927
 928/*
 929 * open sessions for any export targets for the given mds
 930 *
 931 * called under mdsc->mutex
 932 */
 933static struct ceph_mds_session *
 934__open_export_target_session(struct ceph_mds_client *mdsc, int target)
 935{
 936        struct ceph_mds_session *session;
 937
 938        session = __ceph_lookup_mds_session(mdsc, target);
 939        if (!session) {
 940                session = register_session(mdsc, target);
 941                if (IS_ERR(session))
 942                        return session;
 943        }
 944        if (session->s_state == CEPH_MDS_SESSION_NEW ||
 945            session->s_state == CEPH_MDS_SESSION_CLOSING)
 946                __open_session(mdsc, session);
 947
 948        return session;
 949}
 950
 951struct ceph_mds_session *
 952ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
 953{
 954        struct ceph_mds_session *session;
 955
 956        dout("open_export_target_session to mds%d\n", target);
 957
 958        mutex_lock(&mdsc->mutex);
 959        session = __open_export_target_session(mdsc, target);
 960        mutex_unlock(&mdsc->mutex);
 961
 962        return session;
 963}
 964
 965static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 966                                          struct ceph_mds_session *session)
 967{
 968        struct ceph_mds_info *mi;
 969        struct ceph_mds_session *ts;
 970        int i, mds = session->s_mds;
 971
 972        if (mds >= mdsc->mdsmap->m_max_mds)
 973                return;
 974
 975        mi = &mdsc->mdsmap->m_info[mds];
 976        dout("open_export_target_sessions for mds%d (%d targets)\n",
 977             session->s_mds, mi->num_export_targets);
 978
 979        for (i = 0; i < mi->num_export_targets; i++) {
 980                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
 981                if (!IS_ERR(ts))
 982                        ceph_put_mds_session(ts);
 983        }
 984}
 985
 986void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 987                                           struct ceph_mds_session *session)
 988{
 989        mutex_lock(&mdsc->mutex);
 990        __open_export_target_sessions(mdsc, session);
 991        mutex_unlock(&mdsc->mutex);
 992}
 993
 994/*
 995 * session caps
 996 */
 997
 998/* caller holds s_cap_lock, we drop it */
 999static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1000                                 struct ceph_mds_session *session)
1001        __releases(session->s_cap_lock)
1002{
1003        LIST_HEAD(tmp_list);
1004        list_splice_init(&session->s_cap_releases, &tmp_list);
1005        session->s_num_cap_releases = 0;
1006        spin_unlock(&session->s_cap_lock);
1007
1008        dout("cleanup_cap_releases mds%d\n", session->s_mds);
1009        while (!list_empty(&tmp_list)) {
1010                struct ceph_cap *cap;
1011                /* zero out the in-progress message */
1012                cap = list_first_entry(&tmp_list,
1013                                        struct ceph_cap, session_caps);
1014                list_del(&cap->session_caps);
1015                ceph_put_cap(mdsc, cap);
1016        }
1017}
1018
1019static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1020                                     struct ceph_mds_session *session)
1021{
1022        struct ceph_mds_request *req;
1023        struct rb_node *p;
1024
1025        dout("cleanup_session_requests mds%d\n", session->s_mds);
1026        mutex_lock(&mdsc->mutex);
1027        while (!list_empty(&session->s_unsafe)) {
1028                req = list_first_entry(&session->s_unsafe,
1029                                       struct ceph_mds_request, r_unsafe_item);
1030                list_del_init(&req->r_unsafe_item);
1031                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1032                                    req->r_tid);
1033                __unregister_request(mdsc, req);
1034        }
1035        /* zero r_attempts, so kick_requests() will re-send requests */
1036        p = rb_first(&mdsc->request_tree);
1037        while (p) {
1038                req = rb_entry(p, struct ceph_mds_request, r_node);
1039                p = rb_next(p);
1040                if (req->r_session &&
1041                    req->r_session->s_mds == session->s_mds)
1042                        req->r_attempts = 0;
1043        }
1044        mutex_unlock(&mdsc->mutex);
1045}
1046
1047/*
1048 * Helper to safely iterate over all caps associated with a session, with
1049 * special care taken to handle a racing __ceph_remove_cap().
1050 *
1051 * Caller must hold session s_mutex.
1052 */
1053static int iterate_session_caps(struct ceph_mds_session *session,
1054                                 int (*cb)(struct inode *, struct ceph_cap *,
1055                                            void *), void *arg)
1056{
1057        struct list_head *p;
1058        struct ceph_cap *cap;
1059        struct inode *inode, *last_inode = NULL;
1060        struct ceph_cap *old_cap = NULL;
1061        int ret;
1062
1063        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1064        spin_lock(&session->s_cap_lock);
1065        p = session->s_caps.next;
1066        while (p != &session->s_caps) {
1067                cap = list_entry(p, struct ceph_cap, session_caps);
1068                inode = igrab(&cap->ci->vfs_inode);
1069                if (!inode) {
1070                        p = p->next;
1071                        continue;
1072                }
1073                session->s_cap_iterator = cap;
1074                spin_unlock(&session->s_cap_lock);
1075
1076                if (last_inode) {
1077                        iput(last_inode);
1078                        last_inode = NULL;
1079                }
1080                if (old_cap) {
1081                        ceph_put_cap(session->s_mdsc, old_cap);
1082                        old_cap = NULL;
1083                }
1084
1085                ret = cb(inode, cap, arg);
1086                last_inode = inode;
1087
1088                spin_lock(&session->s_cap_lock);
1089                p = p->next;
1090                if (cap->ci == NULL) {
1091                        dout("iterate_session_caps  finishing cap %p removal\n",
1092                             cap);
1093                        BUG_ON(cap->session != session);
1094                        cap->session = NULL;
1095                        list_del_init(&cap->session_caps);
1096                        session->s_nr_caps--;
1097                        if (cap->queue_release) {
1098                                list_add_tail(&cap->session_caps,
1099                                              &session->s_cap_releases);
1100                                session->s_num_cap_releases++;
1101                        } else {
1102                                old_cap = cap;  /* put_cap it w/o locks held */
1103                        }
1104                }
1105                if (ret < 0)
1106                        goto out;
1107        }
1108        ret = 0;
1109out:
1110        session->s_cap_iterator = NULL;
1111        spin_unlock(&session->s_cap_lock);
1112
1113        iput(last_inode);
1114        if (old_cap)
1115                ceph_put_cap(session->s_mdsc, old_cap);
1116
1117        return ret;
1118}
1119
1120static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1121                                  void *arg)
1122{
1123        struct ceph_inode_info *ci = ceph_inode(inode);
1124        LIST_HEAD(to_remove);
1125        int drop = 0;
1126
1127        dout("removing cap %p, ci is %p, inode is %p\n",
1128             cap, ci, &ci->vfs_inode);
1129        spin_lock(&ci->i_ceph_lock);
1130        __ceph_remove_cap(cap, false);
1131        if (!ci->i_auth_cap) {
1132                struct ceph_cap_flush *cf;
1133                struct ceph_mds_client *mdsc =
1134                        ceph_sb_to_client(inode->i_sb)->mdsc;
1135
1136                while (true) {
1137                        struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1138                        if (!n)
1139                                break;
1140                        cf = rb_entry(n, struct ceph_cap_flush, i_node);
1141                        rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1142                        list_add(&cf->list, &to_remove);
1143                }
1144
1145                spin_lock(&mdsc->cap_dirty_lock);
1146
1147                list_for_each_entry(cf, &to_remove, list)
1148                        rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
1149
1150                if (!list_empty(&ci->i_dirty_item)) {
1151                        pr_warn_ratelimited(
1152                                " dropping dirty %s state for %p %lld\n",
1153                                ceph_cap_string(ci->i_dirty_caps),
1154                                inode, ceph_ino(inode));
1155                        ci->i_dirty_caps = 0;
1156                        list_del_init(&ci->i_dirty_item);
1157                        drop = 1;
1158                }
1159                if (!list_empty(&ci->i_flushing_item)) {
1160                        pr_warn_ratelimited(
1161                                " dropping dirty+flushing %s state for %p %lld\n",
1162                                ceph_cap_string(ci->i_flushing_caps),
1163                                inode, ceph_ino(inode));
1164                        ci->i_flushing_caps = 0;
1165                        list_del_init(&ci->i_flushing_item);
1166                        mdsc->num_cap_flushing--;
1167                        drop = 1;
1168                }
1169                spin_unlock(&mdsc->cap_dirty_lock);
1170
1171                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1172                        list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
1173                        ci->i_prealloc_cap_flush = NULL;
1174                }
1175        }
1176        spin_unlock(&ci->i_ceph_lock);
1177        while (!list_empty(&to_remove)) {
1178                struct ceph_cap_flush *cf;
1179                cf = list_first_entry(&to_remove,
1180                                      struct ceph_cap_flush, list);
1181                list_del(&cf->list);
1182                ceph_free_cap_flush(cf);
1183        }
1184        while (drop--)
1185                iput(inode);
1186        return 0;
1187}
1188
1189/*
1190 * caller must hold session s_mutex
1191 */
1192static void remove_session_caps(struct ceph_mds_session *session)
1193{
1194        dout("remove_session_caps on %p\n", session);
1195        iterate_session_caps(session, remove_session_caps_cb, NULL);
1196
1197        spin_lock(&session->s_cap_lock);
1198        if (session->s_nr_caps > 0) {
1199                struct super_block *sb = session->s_mdsc->fsc->sb;
1200                struct inode *inode;
1201                struct ceph_cap *cap, *prev = NULL;
1202                struct ceph_vino vino;
1203                /*
1204                 * iterate_session_caps() skips inodes that are being
1205                 * deleted, we need to wait until deletions are complete.
1206                 * __wait_on_freeing_inode() is designed for the job,
1207                 * but it is not exported, so use lookup inode function
1208                 * to access it.
1209                 */
1210                while (!list_empty(&session->s_caps)) {
1211                        cap = list_entry(session->s_caps.next,
1212                                         struct ceph_cap, session_caps);
1213                        if (cap == prev)
1214                                break;
1215                        prev = cap;
1216                        vino = cap->ci->i_vino;
1217                        spin_unlock(&session->s_cap_lock);
1218
1219                        inode = ceph_find_inode(sb, vino);
1220                        iput(inode);
1221
1222                        spin_lock(&session->s_cap_lock);
1223                }
1224        }
1225
1226        // drop cap expires and unlock s_cap_lock
1227        cleanup_cap_releases(session->s_mdsc, session);
1228
1229        BUG_ON(session->s_nr_caps > 0);
1230        BUG_ON(!list_empty(&session->s_cap_flushing));
1231}
1232
1233/*
1234 * wake up any threads waiting on this session's caps.  if the cap is
1235 * old (didn't get renewed on the client reconnect), remove it now.
1236 *
1237 * caller must hold s_mutex.
1238 */
1239static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1240                              void *arg)
1241{
1242        struct ceph_inode_info *ci = ceph_inode(inode);
1243
1244        wake_up_all(&ci->i_cap_wq);
1245        if (arg) {
1246                spin_lock(&ci->i_ceph_lock);
1247                ci->i_wanted_max_size = 0;
1248                ci->i_requested_max_size = 0;
1249                spin_unlock(&ci->i_ceph_lock);
1250        }
1251        return 0;
1252}
1253
1254static void wake_up_session_caps(struct ceph_mds_session *session,
1255                                 int reconnect)
1256{
1257        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1258        iterate_session_caps(session, wake_up_session_cb,
1259                             (void *)(unsigned long)reconnect);
1260}
1261
1262/*
1263 * Send periodic message to MDS renewing all currently held caps.  The
1264 * ack will reset the expiration for all caps from this session.
1265 *
1266 * caller holds s_mutex
1267 */
1268static int send_renew_caps(struct ceph_mds_client *mdsc,
1269                           struct ceph_mds_session *session)
1270{
1271        struct ceph_msg *msg;
1272        int state;
1273
1274        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1275            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1276                pr_info("mds%d caps stale\n", session->s_mds);
1277        session->s_renew_requested = jiffies;
1278
1279        /* do not try to renew caps until a recovering mds has reconnected
1280         * with its clients. */
1281        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1282        if (state < CEPH_MDS_STATE_RECONNECT) {
1283                dout("send_renew_caps ignoring mds%d (%s)\n",
1284                     session->s_mds, ceph_mds_state_name(state));
1285                return 0;
1286        }
1287
1288        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1289                ceph_mds_state_name(state));
1290        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1291                                 ++session->s_renew_seq);
1292        if (!msg)
1293                return -ENOMEM;
1294        ceph_con_send(&session->s_con, msg);
1295        return 0;
1296}
1297
1298static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1299                             struct ceph_mds_session *session, u64 seq)
1300{
1301        struct ceph_msg *msg;
1302
1303        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1304             session->s_mds, ceph_session_state_name(session->s_state), seq);
1305        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1306        if (!msg)
1307                return -ENOMEM;
1308        ceph_con_send(&session->s_con, msg);
1309        return 0;
1310}
1311
1312
1313/*
1314 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1315 *
1316 * Called under session->s_mutex
1317 */
1318static void renewed_caps(struct ceph_mds_client *mdsc,
1319                         struct ceph_mds_session *session, int is_renew)
1320{
1321        int was_stale;
1322        int wake = 0;
1323
1324        spin_lock(&session->s_cap_lock);
1325        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1326
1327        session->s_cap_ttl = session->s_renew_requested +
1328                mdsc->mdsmap->m_session_timeout*HZ;
1329
1330        if (was_stale) {
1331                if (time_before(jiffies, session->s_cap_ttl)) {
1332                        pr_info("mds%d caps renewed\n", session->s_mds);
1333                        wake = 1;
1334                } else {
1335                        pr_info("mds%d caps still stale\n", session->s_mds);
1336                }
1337        }
1338        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1339             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1340             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1341        spin_unlock(&session->s_cap_lock);
1342
1343        if (wake)
1344                wake_up_session_caps(session, 0);
1345}
1346
1347/*
1348 * send a session close request
1349 */
1350static int request_close_session(struct ceph_mds_client *mdsc,
1351                                 struct ceph_mds_session *session)
1352{
1353        struct ceph_msg *msg;
1354
1355        dout("request_close_session mds%d state %s seq %lld\n",
1356             session->s_mds, ceph_session_state_name(session->s_state),
1357             session->s_seq);
1358        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1359        if (!msg)
1360                return -ENOMEM;
1361        ceph_con_send(&session->s_con, msg);
1362        return 0;
1363}
1364
1365/*
1366 * Called with s_mutex held.
1367 */
1368static int __close_session(struct ceph_mds_client *mdsc,
1369                         struct ceph_mds_session *session)
1370{
1371        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1372                return 0;
1373        session->s_state = CEPH_MDS_SESSION_CLOSING;
1374        return request_close_session(mdsc, session);
1375}
1376
1377/*
1378 * Trim old(er) caps.
1379 *
1380 * Because we can't cache an inode without one or more caps, we do
1381 * this indirectly: if a cap is unused, we prune its aliases, at which
1382 * point the inode will hopefully get dropped to.
1383 *
1384 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1385 * memory pressure from the MDS, though, so it needn't be perfect.
1386 */
1387static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1388{
1389        struct ceph_mds_session *session = arg;
1390        struct ceph_inode_info *ci = ceph_inode(inode);
1391        int used, wanted, oissued, mine;
1392
1393        if (session->s_trim_caps <= 0)
1394                return -1;
1395
1396        spin_lock(&ci->i_ceph_lock);
1397        mine = cap->issued | cap->implemented;
1398        used = __ceph_caps_used(ci);
1399        wanted = __ceph_caps_file_wanted(ci);
1400        oissued = __ceph_caps_issued_other(ci, cap);
1401
1402        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1403             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1404             ceph_cap_string(used), ceph_cap_string(wanted));
1405        if (cap == ci->i_auth_cap) {
1406                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1407                    !list_empty(&ci->i_cap_snaps))
1408                        goto out;
1409                if ((used | wanted) & CEPH_CAP_ANY_WR)
1410                        goto out;
1411        }
1412        /* The inode has cached pages, but it's no longer used.
1413         * we can safely drop it */
1414        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1415            !(oissued & CEPH_CAP_FILE_CACHE)) {
1416          used = 0;
1417          oissued = 0;
1418        }
1419        if ((used | wanted) & ~oissued & mine)
1420                goto out;   /* we need these caps */
1421
1422        session->s_trim_caps--;
1423        if (oissued) {
1424                /* we aren't the only cap.. just remove us */
1425                __ceph_remove_cap(cap, true);
1426        } else {
1427                /* try dropping referring dentries */
1428                spin_unlock(&ci->i_ceph_lock);
1429                d_prune_aliases(inode);
1430                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1431                     inode, cap, atomic_read(&inode->i_count));
1432                return 0;
1433        }
1434
1435out:
1436        spin_unlock(&ci->i_ceph_lock);
1437        return 0;
1438}
1439
1440/*
1441 * Trim session cap count down to some max number.
1442 */
1443static int trim_caps(struct ceph_mds_client *mdsc,
1444                     struct ceph_mds_session *session,
1445                     int max_caps)
1446{
1447        int trim_caps = session->s_nr_caps - max_caps;
1448
1449        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1450             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1451        if (trim_caps > 0) {
1452                session->s_trim_caps = trim_caps;
1453                iterate_session_caps(session, trim_caps_cb, session);
1454                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1455                     session->s_mds, session->s_nr_caps, max_caps,
1456                        trim_caps - session->s_trim_caps);
1457                session->s_trim_caps = 0;
1458        }
1459
1460        ceph_send_cap_releases(mdsc, session);
1461        return 0;
1462}
1463
1464static int check_capsnap_flush(struct ceph_inode_info *ci,
1465                               u64 want_snap_seq)
1466{
1467        int ret = 1;
1468        spin_lock(&ci->i_ceph_lock);
1469        if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
1470                struct ceph_cap_snap *capsnap =
1471                        list_first_entry(&ci->i_cap_snaps,
1472                                         struct ceph_cap_snap, ci_item);
1473                ret = capsnap->follows >= want_snap_seq;
1474        }
1475        spin_unlock(&ci->i_ceph_lock);
1476        return ret;
1477}
1478
1479static int check_caps_flush(struct ceph_mds_client *mdsc,
1480                            u64 want_flush_tid)
1481{
1482        struct rb_node *n;
1483        struct ceph_cap_flush *cf;
1484        int ret = 1;
1485
1486        spin_lock(&mdsc->cap_dirty_lock);
1487        n = rb_first(&mdsc->cap_flush_tree);
1488        cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
1489        if (cf && cf->tid <= want_flush_tid) {
1490                dout("check_caps_flush still flushing tid %llu <= %llu\n",
1491                     cf->tid, want_flush_tid);
1492                ret = 0;
1493        }
1494        spin_unlock(&mdsc->cap_dirty_lock);
1495        return ret;
1496}
1497
1498/*
1499 * flush all dirty inode data to disk.
1500 *
1501 * returns true if we've flushed through want_flush_tid
1502 */
1503static void wait_caps_flush(struct ceph_mds_client *mdsc,
1504                            u64 want_flush_tid, u64 want_snap_seq)
1505{
1506        int mds;
1507
1508        dout("check_caps_flush want %llu snap want %llu\n",
1509             want_flush_tid, want_snap_seq);
1510        mutex_lock(&mdsc->mutex);
1511        for (mds = 0; mds < mdsc->max_sessions; ) {
1512                struct ceph_mds_session *session = mdsc->sessions[mds];
1513                struct inode *inode = NULL;
1514
1515                if (!session) {
1516                        mds++;
1517                        continue;
1518                }
1519                get_session(session);
1520                mutex_unlock(&mdsc->mutex);
1521
1522                mutex_lock(&session->s_mutex);
1523                if (!list_empty(&session->s_cap_snaps_flushing)) {
1524                        struct ceph_cap_snap *capsnap =
1525                                list_first_entry(&session->s_cap_snaps_flushing,
1526                                                 struct ceph_cap_snap,
1527                                                 flushing_item);
1528                        struct ceph_inode_info *ci = capsnap->ci;
1529                        if (!check_capsnap_flush(ci, want_snap_seq)) {
1530                                dout("check_cap_flush still flushing snap %p "
1531                                     "follows %lld <= %lld to mds%d\n",
1532                                     &ci->vfs_inode, capsnap->follows,
1533                                     want_snap_seq, mds);
1534                                inode = igrab(&ci->vfs_inode);
1535                        }
1536                }
1537                mutex_unlock(&session->s_mutex);
1538                ceph_put_mds_session(session);
1539
1540                if (inode) {
1541                        wait_event(mdsc->cap_flushing_wq,
1542                                   check_capsnap_flush(ceph_inode(inode),
1543                                                       want_snap_seq));
1544                        iput(inode);
1545                } else {
1546                        mds++;
1547                }
1548
1549                mutex_lock(&mdsc->mutex);
1550        }
1551        mutex_unlock(&mdsc->mutex);
1552
1553        wait_event(mdsc->cap_flushing_wq,
1554                   check_caps_flush(mdsc, want_flush_tid));
1555
1556        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1557}
1558
1559/*
1560 * called under s_mutex
1561 */
1562void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1563                            struct ceph_mds_session *session)
1564{
1565        struct ceph_msg *msg = NULL;
1566        struct ceph_mds_cap_release *head;
1567        struct ceph_mds_cap_item *item;
1568        struct ceph_cap *cap;
1569        LIST_HEAD(tmp_list);
1570        int num_cap_releases;
1571
1572        spin_lock(&session->s_cap_lock);
1573again:
1574        list_splice_init(&session->s_cap_releases, &tmp_list);
1575        num_cap_releases = session->s_num_cap_releases;
1576        session->s_num_cap_releases = 0;
1577        spin_unlock(&session->s_cap_lock);
1578
1579        while (!list_empty(&tmp_list)) {
1580                if (!msg) {
1581                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1582                                        PAGE_CACHE_SIZE, GFP_NOFS, false);
1583                        if (!msg)
1584                                goto out_err;
1585                        head = msg->front.iov_base;
1586                        head->num = cpu_to_le32(0);
1587                        msg->front.iov_len = sizeof(*head);
1588                }
1589                cap = list_first_entry(&tmp_list, struct ceph_cap,
1590                                        session_caps);
1591                list_del(&cap->session_caps);
1592                num_cap_releases--;
1593
1594                head = msg->front.iov_base;
1595                le32_add_cpu(&head->num, 1);
1596                item = msg->front.iov_base + msg->front.iov_len;
1597                item->ino = cpu_to_le64(cap->cap_ino);
1598                item->cap_id = cpu_to_le64(cap->cap_id);
1599                item->migrate_seq = cpu_to_le32(cap->mseq);
1600                item->seq = cpu_to_le32(cap->issue_seq);
1601                msg->front.iov_len += sizeof(*item);
1602
1603                ceph_put_cap(mdsc, cap);
1604
1605                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1606                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1607                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1608                        ceph_con_send(&session->s_con, msg);
1609                        msg = NULL;
1610                }
1611        }
1612
1613        BUG_ON(num_cap_releases != 0);
1614
1615        spin_lock(&session->s_cap_lock);
1616        if (!list_empty(&session->s_cap_releases))
1617                goto again;
1618        spin_unlock(&session->s_cap_lock);
1619
1620        if (msg) {
1621                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1622                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1623                ceph_con_send(&session->s_con, msg);
1624        }
1625        return;
1626out_err:
1627        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1628                session->s_mds);
1629        spin_lock(&session->s_cap_lock);
1630        list_splice(&tmp_list, &session->s_cap_releases);
1631        session->s_num_cap_releases += num_cap_releases;
1632        spin_unlock(&session->s_cap_lock);
1633}
1634
1635/*
1636 * requests
1637 */
1638
1639int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1640                                    struct inode *dir)
1641{
1642        struct ceph_inode_info *ci = ceph_inode(dir);
1643        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1644        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1645        size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1646                      sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1647        int order, num_entries;
1648
1649        spin_lock(&ci->i_ceph_lock);
1650        num_entries = ci->i_files + ci->i_subdirs;
1651        spin_unlock(&ci->i_ceph_lock);
1652        num_entries = max(num_entries, 1);
1653        num_entries = min(num_entries, opt->max_readdir);
1654
1655        order = get_order(size * num_entries);
1656        while (order >= 0) {
1657                rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
1658                                                        __GFP_NOWARN,
1659                                                        order);
1660                if (rinfo->dir_in)
1661                        break;
1662                order--;
1663        }
1664        if (!rinfo->dir_in)
1665                return -ENOMEM;
1666
1667        num_entries = (PAGE_SIZE << order) / size;
1668        num_entries = min(num_entries, opt->max_readdir);
1669
1670        rinfo->dir_buf_size = PAGE_SIZE << order;
1671        req->r_num_caps = num_entries + 1;
1672        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1673        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1674        return 0;
1675}
1676
1677/*
1678 * Create an mds request.
1679 */
1680struct ceph_mds_request *
1681ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1682{
1683        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1684
1685        if (!req)
1686                return ERR_PTR(-ENOMEM);
1687
1688        mutex_init(&req->r_fill_mutex);
1689        req->r_mdsc = mdsc;
1690        req->r_started = jiffies;
1691        req->r_resend_mds = -1;
1692        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1693        INIT_LIST_HEAD(&req->r_unsafe_target_item);
1694        req->r_fmode = -1;
1695        kref_init(&req->r_kref);
1696        RB_CLEAR_NODE(&req->r_node);
1697        INIT_LIST_HEAD(&req->r_wait);
1698        init_completion(&req->r_completion);
1699        init_completion(&req->r_safe_completion);
1700        INIT_LIST_HEAD(&req->r_unsafe_item);
1701
1702        req->r_stamp = current_fs_time(mdsc->fsc->sb);
1703
1704        req->r_op = op;
1705        req->r_direct_mode = mode;
1706        return req;
1707}
1708
1709/*
1710 * return oldest (lowest) request, tid in request tree, 0 if none.
1711 *
1712 * called under mdsc->mutex.
1713 */
1714static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1715{
1716        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1717                return NULL;
1718        return rb_entry(rb_first(&mdsc->request_tree),
1719                        struct ceph_mds_request, r_node);
1720}
1721
1722static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1723{
1724        return mdsc->oldest_tid;
1725}
1726
1727/*
1728 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1729 * on build_path_from_dentry in fs/cifs/dir.c.
1730 *
1731 * If @stop_on_nosnap, generate path relative to the first non-snapped
1732 * inode.
1733 *
1734 * Encode hidden .snap dirs as a double /, i.e.
1735 *   foo/.snap/bar -> foo//bar
1736 */
1737char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1738                           int stop_on_nosnap)
1739{
1740        struct dentry *temp;
1741        char *path;
1742        int len, pos;
1743        unsigned seq;
1744
1745        if (dentry == NULL)
1746                return ERR_PTR(-EINVAL);
1747
1748retry:
1749        len = 0;
1750        seq = read_seqbegin(&rename_lock);
1751        rcu_read_lock();
1752        for (temp = dentry; !IS_ROOT(temp);) {
1753                struct inode *inode = temp->d_inode;
1754                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1755                        len++;  /* slash only */
1756                else if (stop_on_nosnap && inode &&
1757                         ceph_snap(inode) == CEPH_NOSNAP)
1758                        break;
1759                else
1760                        len += 1 + temp->d_name.len;
1761                temp = temp->d_parent;
1762        }
1763        rcu_read_unlock();
1764        if (len)
1765                len--;  /* no leading '/' */
1766
1767        path = kmalloc(len+1, GFP_NOFS);
1768        if (path == NULL)
1769                return ERR_PTR(-ENOMEM);
1770        pos = len;
1771        path[pos] = 0;  /* trailing null */
1772        rcu_read_lock();
1773        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1774                struct inode *inode;
1775
1776                spin_lock(&temp->d_lock);
1777                inode = temp->d_inode;
1778                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1779                        dout("build_path path+%d: %p SNAPDIR\n",
1780                             pos, temp);
1781                } else if (stop_on_nosnap && inode &&
1782                           ceph_snap(inode) == CEPH_NOSNAP) {
1783                        spin_unlock(&temp->d_lock);
1784                        break;
1785                } else {
1786                        pos -= temp->d_name.len;
1787                        if (pos < 0) {
1788                                spin_unlock(&temp->d_lock);
1789                                break;
1790                        }
1791                        strncpy(path + pos, temp->d_name.name,
1792                                temp->d_name.len);
1793                }
1794                spin_unlock(&temp->d_lock);
1795                if (pos)
1796                        path[--pos] = '/';
1797                temp = temp->d_parent;
1798        }
1799        rcu_read_unlock();
1800        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1801                pr_err("build_path did not end path lookup where "
1802                       "expected, namelen is %d, pos is %d\n", len, pos);
1803                /* presumably this is only possible if racing with a
1804                   rename of one of the parent directories (we can not
1805                   lock the dentries above us to prevent this, but
1806                   retrying should be harmless) */
1807                kfree(path);
1808                goto retry;
1809        }
1810
1811        *base = ceph_ino(temp->d_inode);
1812        *plen = len;
1813        dout("build_path on %p %d built %llx '%.*s'\n",
1814             dentry, d_count(dentry), *base, len, path);
1815        return path;
1816}
1817
1818static int build_dentry_path(struct dentry *dentry,
1819                             const char **ppath, int *ppathlen, u64 *pino,
1820                             int *pfreepath)
1821{
1822        char *path;
1823
1824        if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1825                *pino = ceph_ino(dentry->d_parent->d_inode);
1826                *ppath = dentry->d_name.name;
1827                *ppathlen = dentry->d_name.len;
1828                return 0;
1829        }
1830        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1831        if (IS_ERR(path))
1832                return PTR_ERR(path);
1833        *ppath = path;
1834        *pfreepath = 1;
1835        return 0;
1836}
1837
1838static int build_inode_path(struct inode *inode,
1839                            const char **ppath, int *ppathlen, u64 *pino,
1840                            int *pfreepath)
1841{
1842        struct dentry *dentry;
1843        char *path;
1844
1845        if (ceph_snap(inode) == CEPH_NOSNAP) {
1846                *pino = ceph_ino(inode);
1847                *ppathlen = 0;
1848                return 0;
1849        }
1850        dentry = d_find_alias(inode);
1851        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1852        dput(dentry);
1853        if (IS_ERR(path))
1854                return PTR_ERR(path);
1855        *ppath = path;
1856        *pfreepath = 1;
1857        return 0;
1858}
1859
1860/*
1861 * request arguments may be specified via an inode *, a dentry *, or
1862 * an explicit ino+path.
1863 */
1864static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1865                                  const char *rpath, u64 rino,
1866                                  const char **ppath, int *pathlen,
1867                                  u64 *ino, int *freepath)
1868{
1869        int r = 0;
1870
1871        if (rinode) {
1872                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1873                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1874                     ceph_snap(rinode));
1875        } else if (rdentry) {
1876                r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1877                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1878                     *ppath);
1879        } else if (rpath || rino) {
1880                *ino = rino;
1881                *ppath = rpath;
1882                *pathlen = rpath ? strlen(rpath) : 0;
1883                dout(" path %.*s\n", *pathlen, rpath);
1884        }
1885
1886        return r;
1887}
1888
1889/*
1890 * called under mdsc->mutex
1891 */
1892static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1893                                               struct ceph_mds_request *req,
1894                                               int mds, bool drop_cap_releases)
1895{
1896        struct ceph_msg *msg;
1897        struct ceph_mds_request_head *head;
1898        const char *path1 = NULL;
1899        const char *path2 = NULL;
1900        u64 ino1 = 0, ino2 = 0;
1901        int pathlen1 = 0, pathlen2 = 0;
1902        int freepath1 = 0, freepath2 = 0;
1903        int len;
1904        u16 releases;
1905        void *p, *end;
1906        int ret;
1907
1908        ret = set_request_path_attr(req->r_inode, req->r_dentry,
1909                              req->r_path1, req->r_ino1.ino,
1910                              &path1, &pathlen1, &ino1, &freepath1);
1911        if (ret < 0) {
1912                msg = ERR_PTR(ret);
1913                goto out;
1914        }
1915
1916        ret = set_request_path_attr(NULL, req->r_old_dentry,
1917                              req->r_path2, req->r_ino2.ino,
1918                              &path2, &pathlen2, &ino2, &freepath2);
1919        if (ret < 0) {
1920                msg = ERR_PTR(ret);
1921                goto out_free1;
1922        }
1923
1924        len = sizeof(*head) +
1925                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1926                sizeof(struct ceph_timespec);
1927
1928        /* calculate (max) length for cap releases */
1929        len += sizeof(struct ceph_mds_request_release) *
1930                (!!req->r_inode_drop + !!req->r_dentry_drop +
1931                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1932        if (req->r_dentry_drop)
1933                len += req->r_dentry->d_name.len;
1934        if (req->r_old_dentry_drop)
1935                len += req->r_old_dentry->d_name.len;
1936
1937        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1938        if (!msg) {
1939                msg = ERR_PTR(-ENOMEM);
1940                goto out_free2;
1941        }
1942
1943        msg->hdr.version = cpu_to_le16(2);
1944        msg->hdr.tid = cpu_to_le64(req->r_tid);
1945
1946        head = msg->front.iov_base;
1947        p = msg->front.iov_base + sizeof(*head);
1948        end = msg->front.iov_base + msg->front.iov_len;
1949
1950        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1951        head->op = cpu_to_le32(req->r_op);
1952        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1953        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1954        head->args = req->r_args;
1955
1956        ceph_encode_filepath(&p, end, ino1, path1);
1957        ceph_encode_filepath(&p, end, ino2, path2);
1958
1959        /* make note of release offset, in case we need to replay */
1960        req->r_request_release_offset = p - msg->front.iov_base;
1961
1962        /* cap releases */
1963        releases = 0;
1964        if (req->r_inode_drop)
1965                releases += ceph_encode_inode_release(&p,
1966                      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1967                      mds, req->r_inode_drop, req->r_inode_unless, 0);
1968        if (req->r_dentry_drop)
1969                releases += ceph_encode_dentry_release(&p, req->r_dentry,
1970                       mds, req->r_dentry_drop, req->r_dentry_unless);
1971        if (req->r_old_dentry_drop)
1972                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1973                       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1974        if (req->r_old_inode_drop)
1975                releases += ceph_encode_inode_release(&p,
1976                      req->r_old_dentry->d_inode,
1977                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1978
1979        if (drop_cap_releases) {
1980                releases = 0;
1981                p = msg->front.iov_base + req->r_request_release_offset;
1982        }
1983
1984        head->num_releases = cpu_to_le16(releases);
1985
1986        /* time stamp */
1987        {
1988                struct ceph_timespec ts;
1989                ceph_encode_timespec(&ts, &req->r_stamp);
1990                ceph_encode_copy(&p, &ts, sizeof(ts));
1991        }
1992
1993        BUG_ON(p > end);
1994        msg->front.iov_len = p - msg->front.iov_base;
1995        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1996
1997        if (req->r_pagelist) {
1998                struct ceph_pagelist *pagelist = req->r_pagelist;
1999                atomic_inc(&pagelist->refcnt);
2000                ceph_msg_data_add_pagelist(msg, pagelist);
2001                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2002        } else {
2003                msg->hdr.data_len = 0;
2004        }
2005
2006        msg->hdr.data_off = cpu_to_le16(0);
2007
2008out_free2:
2009        if (freepath2)
2010                kfree((char *)path2);
2011out_free1:
2012        if (freepath1)
2013                kfree((char *)path1);
2014out:
2015        return msg;
2016}
2017
2018/*
2019 * called under mdsc->mutex if error, under no mutex if
2020 * success.
2021 */
2022static void complete_request(struct ceph_mds_client *mdsc,
2023                             struct ceph_mds_request *req)
2024{
2025        if (req->r_callback)
2026                req->r_callback(mdsc, req);
2027        else
2028                complete_all(&req->r_completion);
2029}
2030
2031/*
2032 * called under mdsc->mutex
2033 */
2034static int __prepare_send_request(struct ceph_mds_client *mdsc,
2035                                  struct ceph_mds_request *req,
2036                                  int mds, bool drop_cap_releases)
2037{
2038        struct ceph_mds_request_head *rhead;
2039        struct ceph_msg *msg;
2040        int flags = 0;
2041
2042        req->r_attempts++;
2043        if (req->r_inode) {
2044                struct ceph_cap *cap =
2045                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2046
2047                if (cap)
2048                        req->r_sent_on_mseq = cap->mseq;
2049                else
2050                        req->r_sent_on_mseq = -1;
2051        }
2052        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2053             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2054
2055        if (req->r_got_unsafe) {
2056                void *p;
2057                /*
2058                 * Replay.  Do not regenerate message (and rebuild
2059                 * paths, etc.); just use the original message.
2060                 * Rebuilding paths will break for renames because
2061                 * d_move mangles the src name.
2062                 */
2063                msg = req->r_request;
2064                rhead = msg->front.iov_base;
2065
2066                flags = le32_to_cpu(rhead->flags);
2067                flags |= CEPH_MDS_FLAG_REPLAY;
2068                rhead->flags = cpu_to_le32(flags);
2069
2070                if (req->r_target_inode)
2071                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2072
2073                rhead->num_retry = req->r_attempts - 1;
2074
2075                /* remove cap/dentry releases from message */
2076                rhead->num_releases = 0;
2077
2078                /* time stamp */
2079                p = msg->front.iov_base + req->r_request_release_offset;
2080                {
2081                        struct ceph_timespec ts;
2082                        ceph_encode_timespec(&ts, &req->r_stamp);
2083                        ceph_encode_copy(&p, &ts, sizeof(ts));
2084                }
2085
2086                msg->front.iov_len = p - msg->front.iov_base;
2087                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2088                return 0;
2089        }
2090
2091        if (req->r_request) {
2092                ceph_msg_put(req->r_request);
2093                req->r_request = NULL;
2094        }
2095        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2096        if (IS_ERR(msg)) {
2097                req->r_err = PTR_ERR(msg);
2098                return PTR_ERR(msg);
2099        }
2100        req->r_request = msg;
2101
2102        rhead = msg->front.iov_base;
2103        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2104        if (req->r_got_unsafe)
2105                flags |= CEPH_MDS_FLAG_REPLAY;
2106        if (req->r_locked_dir)
2107                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2108        rhead->flags = cpu_to_le32(flags);
2109        rhead->num_fwd = req->r_num_fwd;
2110        rhead->num_retry = req->r_attempts - 1;
2111        rhead->ino = 0;
2112
2113        dout(" r_locked_dir = %p\n", req->r_locked_dir);
2114        return 0;
2115}
2116
2117/*
2118 * send request, or put it on the appropriate wait list.
2119 */
2120static int __do_request(struct ceph_mds_client *mdsc,
2121                        struct ceph_mds_request *req)
2122{
2123        struct ceph_mds_session *session = NULL;
2124        int mds = -1;
2125        int err = 0;
2126
2127        if (req->r_err || req->r_got_result) {
2128                if (req->r_aborted)
2129                        __unregister_request(mdsc, req);
2130                goto out;
2131        }
2132
2133        if (req->r_timeout &&
2134            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2135                dout("do_request timed out\n");
2136                err = -EIO;
2137                goto finish;
2138        }
2139        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2140                dout("do_request forced umount\n");
2141                err = -EIO;
2142                goto finish;
2143        }
2144
2145        put_request_session(req);
2146
2147        mds = __choose_mds(mdsc, req);
2148        if (mds < 0 ||
2149            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2150                dout("do_request no mds or not active, waiting for map\n");
2151                list_add(&req->r_wait, &mdsc->waiting_for_map);
2152                goto out;
2153        }
2154
2155        /* get, open session */
2156        session = __ceph_lookup_mds_session(mdsc, mds);
2157        if (!session) {
2158                session = register_session(mdsc, mds);
2159                if (IS_ERR(session)) {
2160                        err = PTR_ERR(session);
2161                        goto finish;
2162                }
2163        }
2164        req->r_session = get_session(session);
2165
2166        dout("do_request mds%d session %p state %s\n", mds, session,
2167             ceph_session_state_name(session->s_state));
2168        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2169            session->s_state != CEPH_MDS_SESSION_HUNG) {
2170                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2171                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2172                        __open_session(mdsc, session);
2173                list_add(&req->r_wait, &session->s_waiting);
2174                goto out_session;
2175        }
2176
2177        /* send request */
2178        req->r_resend_mds = -1;   /* forget any previous mds hint */
2179
2180        if (req->r_request_started == 0)   /* note request start time */
2181                req->r_request_started = jiffies;
2182
2183        err = __prepare_send_request(mdsc, req, mds, false);
2184        if (!err) {
2185                ceph_msg_get(req->r_request);
2186                ceph_con_send(&session->s_con, req->r_request);
2187        }
2188
2189out_session:
2190        ceph_put_mds_session(session);
2191finish:
2192        if (err) {
2193                dout("__do_request early error %d\n", err);
2194                req->r_err = err;
2195                complete_request(mdsc, req);
2196                __unregister_request(mdsc, req);
2197        }
2198out:
2199        return err;
2200}
2201
2202/*
2203 * called under mdsc->mutex
2204 */
2205static void __wake_requests(struct ceph_mds_client *mdsc,
2206                            struct list_head *head)
2207{
2208        struct ceph_mds_request *req;
2209        LIST_HEAD(tmp_list);
2210
2211        list_splice_init(head, &tmp_list);
2212
2213        while (!list_empty(&tmp_list)) {
2214                req = list_entry(tmp_list.next,
2215                                 struct ceph_mds_request, r_wait);
2216                list_del_init(&req->r_wait);
2217                dout(" wake request %p tid %llu\n", req, req->r_tid);
2218                __do_request(mdsc, req);
2219        }
2220}
2221
2222/*
2223 * Wake up threads with requests pending for @mds, so that they can
2224 * resubmit their requests to a possibly different mds.
2225 */
2226static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2227{
2228        struct ceph_mds_request *req;
2229        struct rb_node *p = rb_first(&mdsc->request_tree);
2230
2231        dout("kick_requests mds%d\n", mds);
2232        while (p) {
2233                req = rb_entry(p, struct ceph_mds_request, r_node);
2234                p = rb_next(p);
2235                if (req->r_got_unsafe)
2236                        continue;
2237                if (req->r_attempts > 0)
2238                        continue; /* only new requests */
2239                if (req->r_session &&
2240                    req->r_session->s_mds == mds) {
2241                        dout(" kicking tid %llu\n", req->r_tid);
2242                        list_del_init(&req->r_wait);
2243                        __do_request(mdsc, req);
2244                }
2245        }
2246}
2247
2248void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2249                              struct ceph_mds_request *req)
2250{
2251        dout("submit_request on %p\n", req);
2252        mutex_lock(&mdsc->mutex);
2253        __register_request(mdsc, req, NULL);
2254        __do_request(mdsc, req);
2255        mutex_unlock(&mdsc->mutex);
2256}
2257
2258/*
2259 * Synchrously perform an mds request.  Take care of all of the
2260 * session setup, forwarding, retry details.
2261 */
2262int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2263                         struct inode *dir,
2264                         struct ceph_mds_request *req)
2265{
2266        int err;
2267
2268        dout("do_request on %p\n", req);
2269
2270        /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2271        if (req->r_inode)
2272                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2273        if (req->r_locked_dir)
2274                ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2275        if (req->r_old_dentry_dir)
2276                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2277                                  CEPH_CAP_PIN);
2278
2279        /* deny access to directories with pool_ns layouts */
2280        if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
2281            ceph_inode(req->r_inode)->i_pool_ns_len)
2282                return -EIO;
2283        if (req->r_locked_dir &&
2284            ceph_inode(req->r_locked_dir)->i_pool_ns_len)
2285                return -EIO;
2286
2287        /* issue */
2288        mutex_lock(&mdsc->mutex);
2289        __register_request(mdsc, req, dir);
2290        __do_request(mdsc, req);
2291
2292        if (req->r_err) {
2293                err = req->r_err;
2294                goto out;
2295        }
2296
2297        /* wait */
2298        mutex_unlock(&mdsc->mutex);
2299        dout("do_request waiting\n");
2300        if (!req->r_timeout && req->r_wait_for_completion) {
2301                err = req->r_wait_for_completion(mdsc, req);
2302        } else {
2303                long timeleft = wait_for_completion_killable_timeout(
2304                                        &req->r_completion,
2305                                        ceph_timeout_jiffies(req->r_timeout));
2306                if (timeleft > 0)
2307                        err = 0;
2308                else if (!timeleft)
2309                        err = -EIO;  /* timed out */
2310                else
2311                        err = timeleft;  /* killed */
2312        }
2313        dout("do_request waited, got %d\n", err);
2314        mutex_lock(&mdsc->mutex);
2315
2316        /* only abort if we didn't race with a real reply */
2317        if (req->r_got_result) {
2318                err = le32_to_cpu(req->r_reply_info.head->result);
2319        } else if (err < 0) {
2320                dout("aborted request %lld with %d\n", req->r_tid, err);
2321
2322                /*
2323                 * ensure we aren't running concurrently with
2324                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2325                 * rely on locks (dir mutex) held by our caller.
2326                 */
2327                mutex_lock(&req->r_fill_mutex);
2328                req->r_err = err;
2329                req->r_aborted = true;
2330                mutex_unlock(&req->r_fill_mutex);
2331
2332                if (req->r_locked_dir &&
2333                    (req->r_op & CEPH_MDS_OP_WRITE))
2334                        ceph_invalidate_dir_request(req);
2335        } else {
2336                err = req->r_err;
2337        }
2338
2339out:
2340        mutex_unlock(&mdsc->mutex);
2341        dout("do_request %p done, result %d\n", req, err);
2342        return err;
2343}
2344
2345/*
2346 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2347 * namespace request.
2348 */
2349void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2350{
2351        struct inode *inode = req->r_locked_dir;
2352
2353        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2354
2355        ceph_dir_clear_complete(inode);
2356        if (req->r_dentry)
2357                ceph_invalidate_dentry_lease(req->r_dentry);
2358        if (req->r_old_dentry)
2359                ceph_invalidate_dentry_lease(req->r_old_dentry);
2360}
2361
2362/*
2363 * Handle mds reply.
2364 *
2365 * We take the session mutex and parse and process the reply immediately.
2366 * This preserves the logical ordering of replies, capabilities, etc., sent
2367 * by the MDS as they are applied to our local cache.
2368 */
2369static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2370{
2371        struct ceph_mds_client *mdsc = session->s_mdsc;
2372        struct ceph_mds_request *req;
2373        struct ceph_mds_reply_head *head = msg->front.iov_base;
2374        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2375        struct ceph_snap_realm *realm;
2376        u64 tid;
2377        int err, result;
2378        int mds = session->s_mds;
2379
2380        if (msg->front.iov_len < sizeof(*head)) {
2381                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2382                ceph_msg_dump(msg);
2383                return;
2384        }
2385
2386        /* get request, session */
2387        tid = le64_to_cpu(msg->hdr.tid);
2388        mutex_lock(&mdsc->mutex);
2389        req = lookup_get_request(mdsc, tid);
2390        if (!req) {
2391                dout("handle_reply on unknown tid %llu\n", tid);
2392                mutex_unlock(&mdsc->mutex);
2393                return;
2394        }
2395        dout("handle_reply %p\n", req);
2396
2397        /* correct session? */
2398        if (req->r_session != session) {
2399                pr_err("mdsc_handle_reply got %llu on session mds%d"
2400                       " not mds%d\n", tid, session->s_mds,
2401                       req->r_session ? req->r_session->s_mds : -1);
2402                mutex_unlock(&mdsc->mutex);
2403                goto out;
2404        }
2405
2406        /* dup? */
2407        if ((req->r_got_unsafe && !head->safe) ||
2408            (req->r_got_safe && head->safe)) {
2409                pr_warn("got a dup %s reply on %llu from mds%d\n",
2410                           head->safe ? "safe" : "unsafe", tid, mds);
2411                mutex_unlock(&mdsc->mutex);
2412                goto out;
2413        }
2414        if (req->r_got_safe) {
2415                pr_warn("got unsafe after safe on %llu from mds%d\n",
2416                           tid, mds);
2417                mutex_unlock(&mdsc->mutex);
2418                goto out;
2419        }
2420
2421        result = le32_to_cpu(head->result);
2422
2423        /*
2424         * Handle an ESTALE
2425         * if we're not talking to the authority, send to them
2426         * if the authority has changed while we weren't looking,
2427         * send to new authority
2428         * Otherwise we just have to return an ESTALE
2429         */
2430        if (result == -ESTALE) {
2431                dout("got ESTALE on request %llu", req->r_tid);
2432                req->r_resend_mds = -1;
2433                if (req->r_direct_mode != USE_AUTH_MDS) {
2434                        dout("not using auth, setting for that now");
2435                        req->r_direct_mode = USE_AUTH_MDS;
2436                        __do_request(mdsc, req);
2437                        mutex_unlock(&mdsc->mutex);
2438                        goto out;
2439                } else  {
2440                        int mds = __choose_mds(mdsc, req);
2441                        if (mds >= 0 && mds != req->r_session->s_mds) {
2442                                dout("but auth changed, so resending");
2443                                __do_request(mdsc, req);
2444                                mutex_unlock(&mdsc->mutex);
2445                                goto out;
2446                        }
2447                }
2448                dout("have to return ESTALE on request %llu", req->r_tid);
2449        }
2450
2451
2452        if (head->safe) {
2453                req->r_got_safe = true;
2454                __unregister_request(mdsc, req);
2455
2456                if (req->r_got_unsafe) {
2457                        /*
2458                         * We already handled the unsafe response, now do the
2459                         * cleanup.  No need to examine the response; the MDS
2460                         * doesn't include any result info in the safe
2461                         * response.  And even if it did, there is nothing
2462                         * useful we could do with a revised return value.
2463                         */
2464                        dout("got safe reply %llu, mds%d\n", tid, mds);
2465                        list_del_init(&req->r_unsafe_item);
2466
2467                        /* last unsafe request during umount? */
2468                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2469                                complete_all(&mdsc->safe_umount_waiters);
2470                        mutex_unlock(&mdsc->mutex);
2471                        goto out;
2472                }
2473        } else {
2474                req->r_got_unsafe = true;
2475                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2476                if (req->r_unsafe_dir) {
2477                        struct ceph_inode_info *ci =
2478                                        ceph_inode(req->r_unsafe_dir);
2479                        spin_lock(&ci->i_unsafe_lock);
2480                        list_add_tail(&req->r_unsafe_dir_item,
2481                                      &ci->i_unsafe_dirops);
2482                        spin_unlock(&ci->i_unsafe_lock);
2483                }
2484        }
2485
2486        dout("handle_reply tid %lld result %d\n", tid, result);
2487        rinfo = &req->r_reply_info;
2488        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2489        mutex_unlock(&mdsc->mutex);
2490
2491        mutex_lock(&session->s_mutex);
2492        if (err < 0) {
2493                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2494                ceph_msg_dump(msg);
2495                goto out_err;
2496        }
2497
2498        /* snap trace */
2499        realm = NULL;
2500        if (rinfo->snapblob_len) {
2501                down_write(&mdsc->snap_rwsem);
2502                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2503                                rinfo->snapblob + rinfo->snapblob_len,
2504                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2505                                &realm);
2506                downgrade_write(&mdsc->snap_rwsem);
2507        } else {
2508                down_read(&mdsc->snap_rwsem);
2509        }
2510
2511        /* insert trace into our cache */
2512        mutex_lock(&req->r_fill_mutex);
2513        current->journal_info = req;
2514        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2515        if (err == 0) {
2516                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2517                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2518                        ceph_readdir_prepopulate(req, req->r_session);
2519                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2520        }
2521        current->journal_info = NULL;
2522        mutex_unlock(&req->r_fill_mutex);
2523
2524        up_read(&mdsc->snap_rwsem);
2525        if (realm)
2526                ceph_put_snap_realm(mdsc, realm);
2527
2528        if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2529                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2530                spin_lock(&ci->i_unsafe_lock);
2531                list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2532                spin_unlock(&ci->i_unsafe_lock);
2533        }
2534out_err:
2535        mutex_lock(&mdsc->mutex);
2536        if (!req->r_aborted) {
2537                if (err) {
2538                        req->r_err = err;
2539                } else {
2540                        req->r_reply =  ceph_msg_get(msg);
2541                        req->r_got_result = true;
2542                }
2543        } else {
2544                dout("reply arrived after request %lld was aborted\n", tid);
2545        }
2546        mutex_unlock(&mdsc->mutex);
2547
2548        mutex_unlock(&session->s_mutex);
2549
2550        /* kick calling process */
2551        complete_request(mdsc, req);
2552out:
2553        ceph_mdsc_put_request(req);
2554        return;
2555}
2556
2557
2558
2559/*
2560 * handle mds notification that our request has been forwarded.
2561 */
2562static void handle_forward(struct ceph_mds_client *mdsc,
2563                           struct ceph_mds_session *session,
2564                           struct ceph_msg *msg)
2565{
2566        struct ceph_mds_request *req;
2567        u64 tid = le64_to_cpu(msg->hdr.tid);
2568        u32 next_mds;
2569        u32 fwd_seq;
2570        int err = -EINVAL;
2571        void *p = msg->front.iov_base;
2572        void *end = p + msg->front.iov_len;
2573
2574        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2575        next_mds = ceph_decode_32(&p);
2576        fwd_seq = ceph_decode_32(&p);
2577
2578        mutex_lock(&mdsc->mutex);
2579        req = lookup_get_request(mdsc, tid);
2580        if (!req) {
2581                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2582                goto out;  /* dup reply? */
2583        }
2584
2585        if (req->r_aborted) {
2586                dout("forward tid %llu aborted, unregistering\n", tid);
2587                __unregister_request(mdsc, req);
2588        } else if (fwd_seq <= req->r_num_fwd) {
2589                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2590                     tid, next_mds, req->r_num_fwd, fwd_seq);
2591        } else {
2592                /* resend. forward race not possible; mds would drop */
2593                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2594                BUG_ON(req->r_err);
2595                BUG_ON(req->r_got_result);
2596                req->r_attempts = 0;
2597                req->r_num_fwd = fwd_seq;
2598                req->r_resend_mds = next_mds;
2599                put_request_session(req);
2600                __do_request(mdsc, req);
2601        }
2602        ceph_mdsc_put_request(req);
2603out:
2604        mutex_unlock(&mdsc->mutex);
2605        return;
2606
2607bad:
2608        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2609}
2610
2611/*
2612 * handle a mds session control message
2613 */
2614static void handle_session(struct ceph_mds_session *session,
2615                           struct ceph_msg *msg)
2616{
2617        struct ceph_mds_client *mdsc = session->s_mdsc;
2618        u32 op;
2619        u64 seq;
2620        int mds = session->s_mds;
2621        struct ceph_mds_session_head *h = msg->front.iov_base;
2622        int wake = 0;
2623
2624        /* decode */
2625        if (msg->front.iov_len != sizeof(*h))
2626                goto bad;
2627        op = le32_to_cpu(h->op);
2628        seq = le64_to_cpu(h->seq);
2629
2630        mutex_lock(&mdsc->mutex);
2631        if (op == CEPH_SESSION_CLOSE)
2632                __unregister_session(mdsc, session);
2633        /* FIXME: this ttl calculation is generous */
2634        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2635        mutex_unlock(&mdsc->mutex);
2636
2637        mutex_lock(&session->s_mutex);
2638
2639        dout("handle_session mds%d %s %p state %s seq %llu\n",
2640             mds, ceph_session_op_name(op), session,
2641             ceph_session_state_name(session->s_state), seq);
2642
2643        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2644                session->s_state = CEPH_MDS_SESSION_OPEN;
2645                pr_info("mds%d came back\n", session->s_mds);
2646        }
2647
2648        switch (op) {
2649        case CEPH_SESSION_OPEN:
2650                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2651                        pr_info("mds%d reconnect success\n", session->s_mds);
2652                session->s_state = CEPH_MDS_SESSION_OPEN;
2653                renewed_caps(mdsc, session, 0);
2654                wake = 1;
2655                if (mdsc->stopping)
2656                        __close_session(mdsc, session);
2657                break;
2658
2659        case CEPH_SESSION_RENEWCAPS:
2660                if (session->s_renew_seq == seq)
2661                        renewed_caps(mdsc, session, 1);
2662                break;
2663
2664        case CEPH_SESSION_CLOSE:
2665                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2666                        pr_info("mds%d reconnect denied\n", session->s_mds);
2667                cleanup_session_requests(mdsc, session);
2668                remove_session_caps(session);
2669                wake = 2; /* for good measure */
2670                wake_up_all(&mdsc->session_close_wq);
2671                break;
2672
2673        case CEPH_SESSION_STALE:
2674                pr_info("mds%d caps went stale, renewing\n",
2675                        session->s_mds);
2676                spin_lock(&session->s_gen_ttl_lock);
2677                session->s_cap_gen++;
2678                session->s_cap_ttl = jiffies - 1;
2679                spin_unlock(&session->s_gen_ttl_lock);
2680                send_renew_caps(mdsc, session);
2681                break;
2682
2683        case CEPH_SESSION_RECALL_STATE:
2684                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2685                break;
2686
2687        case CEPH_SESSION_FLUSHMSG:
2688                send_flushmsg_ack(mdsc, session, seq);
2689                break;
2690
2691        case CEPH_SESSION_FORCE_RO:
2692                dout("force_session_readonly %p\n", session);
2693                spin_lock(&session->s_cap_lock);
2694                session->s_readonly = true;
2695                spin_unlock(&session->s_cap_lock);
2696                wake_up_session_caps(session, 0);
2697                break;
2698
2699        default:
2700                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2701                WARN_ON(1);
2702        }
2703
2704        mutex_unlock(&session->s_mutex);
2705        if (wake) {
2706                mutex_lock(&mdsc->mutex);
2707                __wake_requests(mdsc, &session->s_waiting);
2708                if (wake == 2)
2709                        kick_requests(mdsc, mds);
2710                mutex_unlock(&mdsc->mutex);
2711        }
2712        return;
2713
2714bad:
2715        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2716               (int)msg->front.iov_len);
2717        ceph_msg_dump(msg);
2718        return;
2719}
2720
2721
2722/*
2723 * called under session->mutex.
2724 */
2725static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2726                                   struct ceph_mds_session *session)
2727{
2728        struct ceph_mds_request *req, *nreq;
2729        struct rb_node *p;
2730        int err;
2731
2732        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2733
2734        mutex_lock(&mdsc->mutex);
2735        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2736                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2737                if (!err) {
2738                        ceph_msg_get(req->r_request);
2739                        ceph_con_send(&session->s_con, req->r_request);
2740                }
2741        }
2742
2743        /*
2744         * also re-send old requests when MDS enters reconnect stage. So that MDS
2745         * can process completed request in clientreplay stage.
2746         */
2747        p = rb_first(&mdsc->request_tree);
2748        while (p) {
2749                req = rb_entry(p, struct ceph_mds_request, r_node);
2750                p = rb_next(p);
2751                if (req->r_got_unsafe)
2752                        continue;
2753                if (req->r_attempts == 0)
2754                        continue; /* only old requests */
2755                if (req->r_session &&
2756                    req->r_session->s_mds == session->s_mds) {
2757                        err = __prepare_send_request(mdsc, req,
2758                                                     session->s_mds, true);
2759                        if (!err) {
2760                                ceph_msg_get(req->r_request);
2761                                ceph_con_send(&session->s_con, req->r_request);
2762                        }
2763                }
2764        }
2765        mutex_unlock(&mdsc->mutex);
2766}
2767
2768/*
2769 * Encode information about a cap for a reconnect with the MDS.
2770 */
2771static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2772                          void *arg)
2773{
2774        union {
2775                struct ceph_mds_cap_reconnect v2;
2776                struct ceph_mds_cap_reconnect_v1 v1;
2777        } rec;
2778        size_t reclen;
2779        struct ceph_inode_info *ci;
2780        struct ceph_reconnect_state *recon_state = arg;
2781        struct ceph_pagelist *pagelist = recon_state->pagelist;
2782        char *path;
2783        int pathlen, err;
2784        u64 pathbase;
2785        struct dentry *dentry;
2786
2787        ci = cap->ci;
2788
2789        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2790             inode, ceph_vinop(inode), cap, cap->cap_id,
2791             ceph_cap_string(cap->issued));
2792        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2793        if (err)
2794                return err;
2795
2796        dentry = d_find_alias(inode);
2797        if (dentry) {
2798                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2799                if (IS_ERR(path)) {
2800                        err = PTR_ERR(path);
2801                        goto out_dput;
2802                }
2803        } else {
2804                path = NULL;
2805                pathlen = 0;
2806        }
2807        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2808        if (err)
2809                goto out_free;
2810
2811        spin_lock(&ci->i_ceph_lock);
2812        cap->seq = 0;        /* reset cap seq */
2813        cap->issue_seq = 0;  /* and issue_seq */
2814        cap->mseq = 0;       /* and migrate_seq */
2815        cap->cap_gen = cap->session->s_cap_gen;
2816
2817        if (recon_state->flock) {
2818                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2819                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2820                rec.v2.issued = cpu_to_le32(cap->issued);
2821                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2822                rec.v2.pathbase = cpu_to_le64(pathbase);
2823                rec.v2.flock_len = 0;
2824                reclen = sizeof(rec.v2);
2825        } else {
2826                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2827                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2828                rec.v1.issued = cpu_to_le32(cap->issued);
2829                rec.v1.size = cpu_to_le64(inode->i_size);
2830                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2831                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2832                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2833                rec.v1.pathbase = cpu_to_le64(pathbase);
2834                reclen = sizeof(rec.v1);
2835        }
2836        spin_unlock(&ci->i_ceph_lock);
2837
2838        if (recon_state->flock) {
2839                int num_fcntl_locks, num_flock_locks;
2840                struct ceph_filelock *flocks;
2841
2842encode_again:
2843                spin_lock(&inode->i_lock);
2844                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2845                spin_unlock(&inode->i_lock);
2846                flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2847                                 sizeof(struct ceph_filelock), GFP_NOFS);
2848                if (!flocks) {
2849                        err = -ENOMEM;
2850                        goto out_free;
2851                }
2852                spin_lock(&inode->i_lock);
2853                err = ceph_encode_locks_to_buffer(inode, flocks,
2854                                                  num_fcntl_locks,
2855                                                  num_flock_locks);
2856                spin_unlock(&inode->i_lock);
2857                if (err) {
2858                        kfree(flocks);
2859                        if (err == -ENOSPC)
2860                                goto encode_again;
2861                        goto out_free;
2862                }
2863                /*
2864                 * number of encoded locks is stable, so copy to pagelist
2865                 */
2866                rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2867                                    (num_fcntl_locks+num_flock_locks) *
2868                                    sizeof(struct ceph_filelock));
2869                err = ceph_pagelist_append(pagelist, &rec, reclen);
2870                if (!err)
2871                        err = ceph_locks_to_pagelist(flocks, pagelist,
2872                                                     num_fcntl_locks,
2873                                                     num_flock_locks);
2874                kfree(flocks);
2875        } else {
2876                err = ceph_pagelist_append(pagelist, &rec, reclen);
2877        }
2878
2879        recon_state->nr_caps++;
2880out_free:
2881        kfree(path);
2882out_dput:
2883        dput(dentry);
2884        return err;
2885}
2886
2887
2888/*
2889 * If an MDS fails and recovers, clients need to reconnect in order to
2890 * reestablish shared state.  This includes all caps issued through
2891 * this session _and_ the snap_realm hierarchy.  Because it's not
2892 * clear which snap realms the mds cares about, we send everything we
2893 * know about.. that ensures we'll then get any new info the
2894 * recovering MDS might have.
2895 *
2896 * This is a relatively heavyweight operation, but it's rare.
2897 *
2898 * called with mdsc->mutex held.
2899 */
2900static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2901                               struct ceph_mds_session *session)
2902{
2903        struct ceph_msg *reply;
2904        struct rb_node *p;
2905        int mds = session->s_mds;
2906        int err = -ENOMEM;
2907        int s_nr_caps;
2908        struct ceph_pagelist *pagelist;
2909        struct ceph_reconnect_state recon_state;
2910
2911        pr_info("mds%d reconnect start\n", mds);
2912
2913        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2914        if (!pagelist)
2915                goto fail_nopagelist;
2916        ceph_pagelist_init(pagelist);
2917
2918        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2919        if (!reply)
2920                goto fail_nomsg;
2921
2922        mutex_lock(&session->s_mutex);
2923        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2924        session->s_seq = 0;
2925
2926        dout("session %p state %s\n", session,
2927             ceph_session_state_name(session->s_state));
2928
2929        spin_lock(&session->s_gen_ttl_lock);
2930        session->s_cap_gen++;
2931        spin_unlock(&session->s_gen_ttl_lock);
2932
2933        spin_lock(&session->s_cap_lock);
2934        /* don't know if session is readonly */
2935        session->s_readonly = 0;
2936        /*
2937         * notify __ceph_remove_cap() that we are composing cap reconnect.
2938         * If a cap get released before being added to the cap reconnect,
2939         * __ceph_remove_cap() should skip queuing cap release.
2940         */
2941        session->s_cap_reconnect = 1;
2942        /* drop old cap expires; we're about to reestablish that state */
2943        cleanup_cap_releases(mdsc, session);
2944
2945        /* trim unused caps to reduce MDS's cache rejoin time */
2946        if (mdsc->fsc->sb->s_root)
2947                shrink_dcache_parent(mdsc->fsc->sb->s_root);
2948
2949        ceph_con_close(&session->s_con);
2950        ceph_con_open(&session->s_con,
2951                      CEPH_ENTITY_TYPE_MDS, mds,
2952                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2953
2954        /* replay unsafe requests */
2955        replay_unsafe_requests(mdsc, session);
2956
2957        down_read(&mdsc->snap_rwsem);
2958
2959        /* traverse this session's caps */
2960        s_nr_caps = session->s_nr_caps;
2961        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2962        if (err)
2963                goto fail;
2964
2965        recon_state.nr_caps = 0;
2966        recon_state.pagelist = pagelist;
2967        recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2968        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2969        if (err < 0)
2970                goto fail;
2971
2972        spin_lock(&session->s_cap_lock);
2973        session->s_cap_reconnect = 0;
2974        spin_unlock(&session->s_cap_lock);
2975
2976        /*
2977         * snaprealms.  we provide mds with the ino, seq (version), and
2978         * parent for all of our realms.  If the mds has any newer info,
2979         * it will tell us.
2980         */
2981        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2982                struct ceph_snap_realm *realm =
2983                        rb_entry(p, struct ceph_snap_realm, node);
2984                struct ceph_mds_snaprealm_reconnect sr_rec;
2985
2986                dout(" adding snap realm %llx seq %lld parent %llx\n",
2987                     realm->ino, realm->seq, realm->parent_ino);
2988                sr_rec.ino = cpu_to_le64(realm->ino);
2989                sr_rec.seq = cpu_to_le64(realm->seq);
2990                sr_rec.parent = cpu_to_le64(realm->parent_ino);
2991                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2992                if (err)
2993                        goto fail;
2994        }
2995
2996        if (recon_state.flock)
2997                reply->hdr.version = cpu_to_le16(2);
2998
2999        /* raced with cap release? */
3000        if (s_nr_caps != recon_state.nr_caps) {
3001                struct page *page = list_first_entry(&pagelist->head,
3002                                                     struct page, lru);
3003                __le32 *addr = kmap_atomic(page);
3004                *addr = cpu_to_le32(recon_state.nr_caps);
3005                kunmap_atomic(addr);
3006        }
3007
3008        reply->hdr.data_len = cpu_to_le32(pagelist->length);
3009        ceph_msg_data_add_pagelist(reply, pagelist);
3010
3011        ceph_early_kick_flushing_caps(mdsc, session);
3012
3013        ceph_con_send(&session->s_con, reply);
3014
3015        mutex_unlock(&session->s_mutex);
3016
3017        mutex_lock(&mdsc->mutex);
3018        __wake_requests(mdsc, &session->s_waiting);
3019        mutex_unlock(&mdsc->mutex);
3020
3021        up_read(&mdsc->snap_rwsem);
3022        return;
3023
3024fail:
3025        ceph_msg_put(reply);
3026        up_read(&mdsc->snap_rwsem);
3027        mutex_unlock(&session->s_mutex);
3028fail_nomsg:
3029        ceph_pagelist_release(pagelist);
3030fail_nopagelist:
3031        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3032        return;
3033}
3034
3035
3036/*
3037 * compare old and new mdsmaps, kicking requests
3038 * and closing out old connections as necessary
3039 *
3040 * called under mdsc->mutex.
3041 */
3042static void check_new_map(struct ceph_mds_client *mdsc,
3043                          struct ceph_mdsmap *newmap,
3044                          struct ceph_mdsmap *oldmap)
3045{
3046        int i;
3047        int oldstate, newstate;
3048        struct ceph_mds_session *s;
3049
3050        dout("check_new_map new %u old %u\n",
3051             newmap->m_epoch, oldmap->m_epoch);
3052
3053        for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3054                if (mdsc->sessions[i] == NULL)
3055                        continue;
3056                s = mdsc->sessions[i];
3057                oldstate = ceph_mdsmap_get_state(oldmap, i);
3058                newstate = ceph_mdsmap_get_state(newmap, i);
3059
3060                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3061                     i, ceph_mds_state_name(oldstate),
3062                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3063                     ceph_mds_state_name(newstate),
3064                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3065                     ceph_session_state_name(s->s_state));
3066
3067                if (i >= newmap->m_max_mds ||
3068                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3069                           ceph_mdsmap_get_addr(newmap, i),
3070                           sizeof(struct ceph_entity_addr))) {
3071                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3072                                /* the session never opened, just close it
3073                                 * out now */
3074                                __wake_requests(mdsc, &s->s_waiting);
3075                                __unregister_session(mdsc, s);
3076                        } else {
3077                                /* just close it */
3078                                mutex_unlock(&mdsc->mutex);
3079                                mutex_lock(&s->s_mutex);
3080                                mutex_lock(&mdsc->mutex);
3081                                ceph_con_close(&s->s_con);
3082                                mutex_unlock(&s->s_mutex);
3083                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3084                        }
3085                } else if (oldstate == newstate) {
3086                        continue;  /* nothing new with this mds */
3087                }
3088
3089                /*
3090                 * send reconnect?
3091                 */
3092                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3093                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3094                        mutex_unlock(&mdsc->mutex);
3095                        send_mds_reconnect(mdsc, s);
3096                        mutex_lock(&mdsc->mutex);
3097                }
3098
3099                /*
3100                 * kick request on any mds that has gone active.
3101                 */
3102                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3103                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3104                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3105                            oldstate != CEPH_MDS_STATE_STARTING)
3106                                pr_info("mds%d recovery completed\n", s->s_mds);
3107                        kick_requests(mdsc, i);
3108                        ceph_kick_flushing_caps(mdsc, s);
3109                        wake_up_session_caps(s, 1);
3110                }
3111        }
3112
3113        for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3114                s = mdsc->sessions[i];
3115                if (!s)
3116                        continue;
3117                if (!ceph_mdsmap_is_laggy(newmap, i))
3118                        continue;
3119                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3120                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3121                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3122                        dout(" connecting to export targets of laggy mds%d\n",
3123                             i);
3124                        __open_export_target_sessions(mdsc, s);
3125                }
3126        }
3127}
3128
3129
3130
3131/*
3132 * leases
3133 */
3134
3135/*
3136 * caller must hold session s_mutex, dentry->d_lock
3137 */
3138void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3139{
3140        struct ceph_dentry_info *di = ceph_dentry(dentry);
3141
3142        ceph_put_mds_session(di->lease_session);
3143        di->lease_session = NULL;
3144}
3145
3146static void handle_lease(struct ceph_mds_client *mdsc,
3147                         struct ceph_mds_session *session,
3148                         struct ceph_msg *msg)
3149{
3150        struct super_block *sb = mdsc->fsc->sb;
3151        struct inode *inode;
3152        struct dentry *parent, *dentry;
3153        struct ceph_dentry_info *di;
3154        int mds = session->s_mds;
3155        struct ceph_mds_lease *h = msg->front.iov_base;
3156        u32 seq;
3157        struct ceph_vino vino;
3158        struct qstr dname;
3159        int release = 0;
3160
3161        dout("handle_lease from mds%d\n", mds);
3162
3163        /* decode */
3164        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3165                goto bad;
3166        vino.ino = le64_to_cpu(h->ino);
3167        vino.snap = CEPH_NOSNAP;
3168        seq = le32_to_cpu(h->seq);
3169        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3170        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3171        if (dname.len != get_unaligned_le32(h+1))
3172                goto bad;
3173
3174        /* lookup inode */
3175        inode = ceph_find_inode(sb, vino);
3176        dout("handle_lease %s, ino %llx %p %.*s\n",
3177             ceph_lease_op_name(h->action), vino.ino, inode,
3178             dname.len, dname.name);
3179
3180        mutex_lock(&session->s_mutex);
3181        session->s_seq++;
3182
3183        if (inode == NULL) {
3184                dout("handle_lease no inode %llx\n", vino.ino);
3185                goto release;
3186        }
3187
3188        /* dentry */
3189        parent = d_find_alias(inode);
3190        if (!parent) {
3191                dout("no parent dentry on inode %p\n", inode);
3192                WARN_ON(1);
3193                goto release;  /* hrm... */
3194        }
3195        dname.hash = full_name_hash(dname.name, dname.len);
3196        dentry = d_lookup(parent, &dname);
3197        dput(parent);
3198        if (!dentry)
3199                goto release;
3200
3201        spin_lock(&dentry->d_lock);
3202        di = ceph_dentry(dentry);
3203        switch (h->action) {
3204        case CEPH_MDS_LEASE_REVOKE:
3205                if (di->lease_session == session) {
3206                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3207                                h->seq = cpu_to_le32(di->lease_seq);
3208                        __ceph_mdsc_drop_dentry_lease(dentry);
3209                }
3210                release = 1;
3211                break;
3212
3213        case CEPH_MDS_LEASE_RENEW:
3214                if (di->lease_session == session &&
3215                    di->lease_gen == session->s_cap_gen &&
3216                    di->lease_renew_from &&
3217                    di->lease_renew_after == 0) {
3218                        unsigned long duration =
3219                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3220
3221                        di->lease_seq = seq;
3222                        dentry->d_time = di->lease_renew_from + duration;
3223                        di->lease_renew_after = di->lease_renew_from +
3224                                (duration >> 1);
3225                        di->lease_renew_from = 0;
3226                }
3227                break;
3228        }
3229        spin_unlock(&dentry->d_lock);
3230        dput(dentry);
3231
3232        if (!release)
3233                goto out;
3234
3235release:
3236        /* let's just reuse the same message */
3237        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3238        ceph_msg_get(msg);
3239        ceph_con_send(&session->s_con, msg);
3240
3241out:
3242        iput(inode);
3243        mutex_unlock(&session->s_mutex);
3244        return;
3245
3246bad:
3247        pr_err("corrupt lease message\n");
3248        ceph_msg_dump(msg);
3249}
3250
3251void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3252                              struct inode *inode,
3253                              struct dentry *dentry, char action,
3254                              u32 seq)
3255{
3256        struct ceph_msg *msg;
3257        struct ceph_mds_lease *lease;
3258        int len = sizeof(*lease) + sizeof(u32);
3259        int dnamelen = 0;
3260
3261        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3262             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3263        dnamelen = dentry->d_name.len;
3264        len += dnamelen;
3265
3266        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3267        if (!msg)
3268                return;
3269        lease = msg->front.iov_base;
3270        lease->action = action;
3271        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3272        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3273        lease->seq = cpu_to_le32(seq);
3274        put_unaligned_le32(dnamelen, lease + 1);
3275        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3276
3277        /*
3278         * if this is a preemptive lease RELEASE, no need to
3279         * flush request stream, since the actual request will
3280         * soon follow.
3281         */
3282        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3283
3284        ceph_con_send(&session->s_con, msg);
3285}
3286
3287/*
3288 * Preemptively release a lease we expect to invalidate anyway.
3289 * Pass @inode always, @dentry is optional.
3290 */
3291void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3292                             struct dentry *dentry)
3293{
3294        struct ceph_dentry_info *di;
3295        struct ceph_mds_session *session;
3296        u32 seq;
3297
3298        BUG_ON(inode == NULL);
3299        BUG_ON(dentry == NULL);
3300
3301        /* is dentry lease valid? */
3302        spin_lock(&dentry->d_lock);
3303        di = ceph_dentry(dentry);
3304        if (!di || !di->lease_session ||
3305            di->lease_session->s_mds < 0 ||
3306            di->lease_gen != di->lease_session->s_cap_gen ||
3307            !time_before(jiffies, dentry->d_time)) {
3308                dout("lease_release inode %p dentry %p -- "
3309                     "no lease\n",
3310                     inode, dentry);
3311                spin_unlock(&dentry->d_lock);
3312                return;
3313        }
3314
3315        /* we do have a lease on this dentry; note mds and seq */
3316        session = ceph_get_mds_session(di->lease_session);
3317        seq = di->lease_seq;
3318        __ceph_mdsc_drop_dentry_lease(dentry);
3319        spin_unlock(&dentry->d_lock);
3320
3321        dout("lease_release inode %p dentry %p to mds%d\n",
3322             inode, dentry, session->s_mds);
3323        ceph_mdsc_lease_send_msg(session, inode, dentry,
3324                                 CEPH_MDS_LEASE_RELEASE, seq);
3325        ceph_put_mds_session(session);
3326}
3327
3328/*
3329 * drop all leases (and dentry refs) in preparation for umount
3330 */
3331static void drop_leases(struct ceph_mds_client *mdsc)
3332{
3333        int i;
3334
3335        dout("drop_leases\n");
3336        mutex_lock(&mdsc->mutex);
3337        for (i = 0; i < mdsc->max_sessions; i++) {
3338                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3339                if (!s)
3340                        continue;
3341                mutex_unlock(&mdsc->mutex);
3342                mutex_lock(&s->s_mutex);
3343                mutex_unlock(&s->s_mutex);
3344                ceph_put_mds_session(s);
3345                mutex_lock(&mdsc->mutex);
3346        }
3347        mutex_unlock(&mdsc->mutex);
3348}
3349
3350
3351
3352/*
3353 * delayed work -- periodically trim expired leases, renew caps with mds
3354 */
3355static void schedule_delayed(struct ceph_mds_client *mdsc)
3356{
3357        int delay = 5;
3358        unsigned hz = round_jiffies_relative(HZ * delay);
3359        schedule_delayed_work(&mdsc->delayed_work, hz);
3360}
3361
3362static void delayed_work(struct work_struct *work)
3363{
3364        int i;
3365        struct ceph_mds_client *mdsc =
3366                container_of(work, struct ceph_mds_client, delayed_work.work);
3367        int renew_interval;
3368        int renew_caps;
3369
3370        dout("mdsc delayed_work\n");
3371        ceph_check_delayed_caps(mdsc);
3372
3373        mutex_lock(&mdsc->mutex);
3374        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3375        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3376                                   mdsc->last_renew_caps);
3377        if (renew_caps)
3378                mdsc->last_renew_caps = jiffies;
3379
3380        for (i = 0; i < mdsc->max_sessions; i++) {
3381                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3382                if (s == NULL)
3383                        continue;
3384                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3385                        dout("resending session close request for mds%d\n",
3386                             s->s_mds);
3387                        request_close_session(mdsc, s);
3388                        ceph_put_mds_session(s);
3389                        continue;
3390                }
3391                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3392                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3393                                s->s_state = CEPH_MDS_SESSION_HUNG;
3394                                pr_info("mds%d hung\n", s->s_mds);
3395                        }
3396                }
3397                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3398                        /* this mds is failed or recovering, just wait */
3399                        ceph_put_mds_session(s);
3400                        continue;
3401                }
3402                mutex_unlock(&mdsc->mutex);
3403
3404                mutex_lock(&s->s_mutex);
3405                if (renew_caps)
3406                        send_renew_caps(mdsc, s);
3407                else
3408                        ceph_con_keepalive(&s->s_con);
3409                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3410                    s->s_state == CEPH_MDS_SESSION_HUNG)
3411                        ceph_send_cap_releases(mdsc, s);
3412                mutex_unlock(&s->s_mutex);
3413                ceph_put_mds_session(s);
3414
3415                mutex_lock(&mdsc->mutex);
3416        }
3417        mutex_unlock(&mdsc->mutex);
3418
3419        schedule_delayed(mdsc);
3420}
3421
3422int ceph_mdsc_init(struct ceph_fs_client *fsc)
3423
3424{
3425        struct ceph_mds_client *mdsc;
3426
3427        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3428        if (!mdsc)
3429                return -ENOMEM;
3430        mdsc->fsc = fsc;
3431        fsc->mdsc = mdsc;
3432        mutex_init(&mdsc->mutex);
3433        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3434        if (mdsc->mdsmap == NULL) {
3435                kfree(mdsc);
3436                return -ENOMEM;
3437        }
3438
3439        init_completion(&mdsc->safe_umount_waiters);
3440        init_waitqueue_head(&mdsc->session_close_wq);
3441        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3442        mdsc->sessions = NULL;
3443        atomic_set(&mdsc->num_sessions, 0);
3444        mdsc->max_sessions = 0;
3445        mdsc->stopping = 0;
3446        mdsc->last_snap_seq = 0;
3447        init_rwsem(&mdsc->snap_rwsem);
3448        mdsc->snap_realms = RB_ROOT;
3449        INIT_LIST_HEAD(&mdsc->snap_empty);
3450        spin_lock_init(&mdsc->snap_empty_lock);
3451        mdsc->last_tid = 0;
3452        mdsc->oldest_tid = 0;
3453        mdsc->request_tree = RB_ROOT;
3454        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3455        mdsc->last_renew_caps = jiffies;
3456        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3457        spin_lock_init(&mdsc->cap_delay_lock);
3458        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3459        spin_lock_init(&mdsc->snap_flush_lock);
3460        mdsc->last_cap_flush_tid = 1;
3461        mdsc->cap_flush_tree = RB_ROOT;
3462        INIT_LIST_HEAD(&mdsc->cap_dirty);
3463        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3464        mdsc->num_cap_flushing = 0;
3465        spin_lock_init(&mdsc->cap_dirty_lock);
3466        init_waitqueue_head(&mdsc->cap_flushing_wq);
3467        spin_lock_init(&mdsc->dentry_lru_lock);
3468        INIT_LIST_HEAD(&mdsc->dentry_lru);
3469
3470        ceph_caps_init(mdsc);
3471        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3472
3473        init_rwsem(&mdsc->pool_perm_rwsem);
3474        mdsc->pool_perm_tree = RB_ROOT;
3475
3476        return 0;
3477}
3478
3479/*
3480 * Wait for safe replies on open mds requests.  If we time out, drop
3481 * all requests from the tree to avoid dangling dentry refs.
3482 */
3483static void wait_requests(struct ceph_mds_client *mdsc)
3484{
3485        struct ceph_options *opts = mdsc->fsc->client->options;
3486        struct ceph_mds_request *req;
3487
3488        mutex_lock(&mdsc->mutex);
3489        if (__get_oldest_req(mdsc)) {
3490                mutex_unlock(&mdsc->mutex);
3491
3492                dout("wait_requests waiting for requests\n");
3493                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3494                                    ceph_timeout_jiffies(opts->mount_timeout));
3495
3496                /* tear down remaining requests */
3497                mutex_lock(&mdsc->mutex);
3498                while ((req = __get_oldest_req(mdsc))) {
3499                        dout("wait_requests timed out on tid %llu\n",
3500                             req->r_tid);
3501                        __unregister_request(mdsc, req);
3502                }
3503        }
3504        mutex_unlock(&mdsc->mutex);
3505        dout("wait_requests done\n");
3506}
3507
3508/*
3509 * called before mount is ro, and before dentries are torn down.
3510 * (hmm, does this still race with new lookups?)
3511 */
3512void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3513{
3514        dout("pre_umount\n");
3515        mdsc->stopping = 1;
3516
3517        drop_leases(mdsc);
3518        ceph_flush_dirty_caps(mdsc);
3519        wait_requests(mdsc);
3520
3521        /*
3522         * wait for reply handlers to drop their request refs and
3523         * their inode/dcache refs
3524         */
3525        ceph_msgr_flush();
3526}
3527
3528/*
3529 * wait for all write mds requests to flush.
3530 */
3531static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3532{
3533        struct ceph_mds_request *req = NULL, *nextreq;
3534        struct rb_node *n;
3535
3536        mutex_lock(&mdsc->mutex);
3537        dout("wait_unsafe_requests want %lld\n", want_tid);
3538restart:
3539        req = __get_oldest_req(mdsc);
3540        while (req && req->r_tid <= want_tid) {
3541                /* find next request */
3542                n = rb_next(&req->r_node);
3543                if (n)
3544                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3545                else
3546                        nextreq = NULL;
3547                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3548                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3549                        /* write op */
3550                        ceph_mdsc_get_request(req);
3551                        if (nextreq)
3552                                ceph_mdsc_get_request(nextreq);
3553                        mutex_unlock(&mdsc->mutex);
3554                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3555                             req->r_tid, want_tid);
3556                        wait_for_completion(&req->r_safe_completion);
3557                        mutex_lock(&mdsc->mutex);
3558                        ceph_mdsc_put_request(req);
3559                        if (!nextreq)
3560                                break;  /* next dne before, so we're done! */
3561                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3562                                /* next request was removed from tree */
3563                                ceph_mdsc_put_request(nextreq);
3564                                goto restart;
3565                        }
3566                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3567                }
3568                req = nextreq;
3569        }
3570        mutex_unlock(&mdsc->mutex);
3571        dout("wait_unsafe_requests done\n");
3572}
3573
3574void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3575{
3576        u64 want_tid, want_flush, want_snap;
3577
3578        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3579                return;
3580
3581        dout("sync\n");
3582        mutex_lock(&mdsc->mutex);
3583        want_tid = mdsc->last_tid;
3584        mutex_unlock(&mdsc->mutex);
3585
3586        ceph_flush_dirty_caps(mdsc);
3587        spin_lock(&mdsc->cap_dirty_lock);
3588        want_flush = mdsc->last_cap_flush_tid;
3589        spin_unlock(&mdsc->cap_dirty_lock);
3590
3591        down_read(&mdsc->snap_rwsem);
3592        want_snap = mdsc->last_snap_seq;
3593        up_read(&mdsc->snap_rwsem);
3594
3595        dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
3596             want_tid, want_flush, want_snap);
3597
3598        wait_unsafe_requests(mdsc, want_tid);
3599        wait_caps_flush(mdsc, want_flush, want_snap);
3600}
3601
3602/*
3603 * true if all sessions are closed, or we force unmount
3604 */
3605static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3606{
3607        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3608                return true;
3609        return atomic_read(&mdsc->num_sessions) == 0;
3610}
3611
3612/*
3613 * called after sb is ro.
3614 */
3615void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3616{
3617        struct ceph_options *opts = mdsc->fsc->client->options;
3618        struct ceph_mds_session *session;
3619        int i;
3620
3621        dout("close_sessions\n");
3622
3623        /* close sessions */
3624        mutex_lock(&mdsc->mutex);
3625        for (i = 0; i < mdsc->max_sessions; i++) {
3626                session = __ceph_lookup_mds_session(mdsc, i);
3627                if (!session)
3628                        continue;
3629                mutex_unlock(&mdsc->mutex);
3630                mutex_lock(&session->s_mutex);
3631                __close_session(mdsc, session);
3632                mutex_unlock(&session->s_mutex);
3633                ceph_put_mds_session(session);
3634                mutex_lock(&mdsc->mutex);
3635        }
3636        mutex_unlock(&mdsc->mutex);
3637
3638        dout("waiting for sessions to close\n");
3639        wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3640                           ceph_timeout_jiffies(opts->mount_timeout));
3641
3642        /* tear down remaining sessions */
3643        mutex_lock(&mdsc->mutex);
3644        for (i = 0; i < mdsc->max_sessions; i++) {
3645                if (mdsc->sessions[i]) {
3646                        session = get_session(mdsc->sessions[i]);
3647                        __unregister_session(mdsc, session);
3648                        mutex_unlock(&mdsc->mutex);
3649                        mutex_lock(&session->s_mutex);
3650                        remove_session_caps(session);
3651                        mutex_unlock(&session->s_mutex);
3652                        ceph_put_mds_session(session);
3653                        mutex_lock(&mdsc->mutex);
3654                }
3655        }
3656        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3657        mutex_unlock(&mdsc->mutex);
3658
3659        ceph_cleanup_empty_realms(mdsc);
3660
3661        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3662
3663        dout("stopped\n");
3664}
3665
3666void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3667{
3668        struct ceph_mds_session *session;
3669        int mds;
3670
3671        dout("force umount\n");
3672
3673        mutex_lock(&mdsc->mutex);
3674        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3675                session = __ceph_lookup_mds_session(mdsc, mds);
3676                if (!session)
3677                        continue;
3678                mutex_unlock(&mdsc->mutex);
3679                mutex_lock(&session->s_mutex);
3680                __close_session(mdsc, session);
3681                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3682                        cleanup_session_requests(mdsc, session);
3683                        remove_session_caps(session);
3684                }
3685                mutex_unlock(&session->s_mutex);
3686                ceph_put_mds_session(session);
3687                mutex_lock(&mdsc->mutex);
3688                kick_requests(mdsc, mds);
3689        }
3690        __wake_requests(mdsc, &mdsc->waiting_for_map);
3691        mutex_unlock(&mdsc->mutex);
3692}
3693
3694static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3695{
3696        dout("stop\n");
3697        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3698        if (mdsc->mdsmap)
3699                ceph_mdsmap_destroy(mdsc->mdsmap);
3700        kfree(mdsc->sessions);
3701        ceph_caps_finalize(mdsc);
3702        ceph_pool_perm_destroy(mdsc);
3703}
3704
3705void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3706{
3707        struct ceph_mds_client *mdsc = fsc->mdsc;
3708
3709        dout("mdsc_destroy %p\n", mdsc);
3710        ceph_mdsc_stop(mdsc);
3711
3712        /* flush out any connection work with references to us */
3713        ceph_msgr_flush();
3714
3715        fsc->mdsc = NULL;
3716        kfree(mdsc);
3717        dout("mdsc_destroy %p done\n", mdsc);
3718}
3719
3720
3721/*
3722 * handle mds map update.
3723 */
3724void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3725{
3726        u32 epoch;
3727        u32 maplen;
3728        void *p = msg->front.iov_base;
3729        void *end = p + msg->front.iov_len;
3730        struct ceph_mdsmap *newmap, *oldmap;
3731        struct ceph_fsid fsid;
3732        int err = -EINVAL;
3733
3734        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3735        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3736        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3737                return;
3738        epoch = ceph_decode_32(&p);
3739        maplen = ceph_decode_32(&p);
3740        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3741
3742        /* do we need it? */
3743        mutex_lock(&mdsc->mutex);
3744        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3745                dout("handle_map epoch %u <= our %u\n",
3746                     epoch, mdsc->mdsmap->m_epoch);
3747                mutex_unlock(&mdsc->mutex);
3748                return;
3749        }
3750
3751        newmap = ceph_mdsmap_decode(&p, end);
3752        if (IS_ERR(newmap)) {
3753                err = PTR_ERR(newmap);
3754                goto bad_unlock;
3755        }
3756
3757        /* swap into place */
3758        if (mdsc->mdsmap) {
3759                oldmap = mdsc->mdsmap;
3760                mdsc->mdsmap = newmap;
3761                check_new_map(mdsc, newmap, oldmap);
3762                ceph_mdsmap_destroy(oldmap);
3763        } else {
3764                mdsc->mdsmap = newmap;  /* first mds map */
3765        }
3766        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3767
3768        __wake_requests(mdsc, &mdsc->waiting_for_map);
3769        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3770                          mdsc->mdsmap->m_epoch);
3771
3772        mutex_unlock(&mdsc->mutex);
3773        schedule_delayed(mdsc);
3774        return;
3775
3776bad_unlock:
3777        mutex_unlock(&mdsc->mutex);
3778bad:
3779        pr_err("error decoding mdsmap %d\n", err);
3780        return;
3781}
3782
3783static struct ceph_connection *con_get(struct ceph_connection *con)
3784{
3785        struct ceph_mds_session *s = con->private;
3786
3787        if (get_session(s)) {
3788                dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3789                return con;
3790        }
3791        dout("mdsc con_get %p FAIL\n", s);
3792        return NULL;
3793}
3794
3795static void con_put(struct ceph_connection *con)
3796{
3797        struct ceph_mds_session *s = con->private;
3798
3799        dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3800        ceph_put_mds_session(s);
3801}
3802
3803/*
3804 * if the client is unresponsive for long enough, the mds will kill
3805 * the session entirely.
3806 */
3807static void peer_reset(struct ceph_connection *con)
3808{
3809        struct ceph_mds_session *s = con->private;
3810        struct ceph_mds_client *mdsc = s->s_mdsc;
3811
3812        pr_warn("mds%d closed our session\n", s->s_mds);
3813        send_mds_reconnect(mdsc, s);
3814}
3815
3816static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3817{
3818        struct ceph_mds_session *s = con->private;
3819        struct ceph_mds_client *mdsc = s->s_mdsc;
3820        int type = le16_to_cpu(msg->hdr.type);
3821
3822        mutex_lock(&mdsc->mutex);
3823        if (__verify_registered_session(mdsc, s) < 0) {
3824                mutex_unlock(&mdsc->mutex);
3825                goto out;
3826        }
3827        mutex_unlock(&mdsc->mutex);
3828
3829        switch (type) {
3830        case CEPH_MSG_MDS_MAP:
3831                ceph_mdsc_handle_map(mdsc, msg);
3832                break;
3833        case CEPH_MSG_CLIENT_SESSION:
3834                handle_session(s, msg);
3835                break;
3836        case CEPH_MSG_CLIENT_REPLY:
3837                handle_reply(s, msg);
3838                break;
3839        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3840                handle_forward(mdsc, s, msg);
3841                break;
3842        case CEPH_MSG_CLIENT_CAPS:
3843                ceph_handle_caps(s, msg);
3844                break;
3845        case CEPH_MSG_CLIENT_SNAP:
3846                ceph_handle_snap(mdsc, s, msg);
3847                break;
3848        case CEPH_MSG_CLIENT_LEASE:
3849                handle_lease(mdsc, s, msg);
3850                break;
3851
3852        default:
3853                pr_err("received unknown message type %d %s\n", type,
3854                       ceph_msg_type_name(type));
3855        }
3856out:
3857        ceph_msg_put(msg);
3858}
3859
3860/*
3861 * authentication
3862 */
3863
3864/*
3865 * Note: returned pointer is the address of a structure that's
3866 * managed separately.  Caller must *not* attempt to free it.
3867 */
3868static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3869                                        int *proto, int force_new)
3870{
3871        struct ceph_mds_session *s = con->private;
3872        struct ceph_mds_client *mdsc = s->s_mdsc;
3873        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3874        struct ceph_auth_handshake *auth = &s->s_auth;
3875
3876        if (force_new && auth->authorizer) {
3877                ceph_auth_destroy_authorizer(auth->authorizer);
3878                auth->authorizer = NULL;
3879        }
3880        if (!auth->authorizer) {
3881                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3882                                                      auth);
3883                if (ret)
3884                        return ERR_PTR(ret);
3885        } else {
3886                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3887                                                      auth);
3888                if (ret)
3889                        return ERR_PTR(ret);
3890        }
3891        *proto = ac->protocol;
3892
3893        return auth;
3894}
3895
3896
3897static int verify_authorizer_reply(struct ceph_connection *con)
3898{
3899        struct ceph_mds_session *s = con->private;
3900        struct ceph_mds_client *mdsc = s->s_mdsc;
3901        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3902
3903        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
3904}
3905
3906static int invalidate_authorizer(struct ceph_connection *con)
3907{
3908        struct ceph_mds_session *s = con->private;
3909        struct ceph_mds_client *mdsc = s->s_mdsc;
3910        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3911
3912        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3913
3914        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3915}
3916
3917static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3918                                struct ceph_msg_header *hdr, int *skip)
3919{
3920        struct ceph_msg *msg;
3921        int type = (int) le16_to_cpu(hdr->type);
3922        int front_len = (int) le32_to_cpu(hdr->front_len);
3923
3924        if (con->in_msg)
3925                return con->in_msg;
3926
3927        *skip = 0;
3928        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3929        if (!msg) {
3930                pr_err("unable to allocate msg type %d len %d\n",
3931                       type, front_len);
3932                return NULL;
3933        }
3934
3935        return msg;
3936}
3937
3938static int mds_sign_message(struct ceph_msg *msg)
3939{
3940       struct ceph_mds_session *s = msg->con->private;
3941       struct ceph_auth_handshake *auth = &s->s_auth;
3942
3943       return ceph_auth_sign_message(auth, msg);
3944}
3945
3946static int mds_check_message_signature(struct ceph_msg *msg)
3947{
3948       struct ceph_mds_session *s = msg->con->private;
3949       struct ceph_auth_handshake *auth = &s->s_auth;
3950
3951       return ceph_auth_check_message_signature(auth, msg);
3952}
3953
3954static const struct ceph_connection_operations mds_con_ops = {
3955        .get = con_get,
3956        .put = con_put,
3957        .dispatch = dispatch,
3958        .get_authorizer = get_authorizer,
3959        .verify_authorizer_reply = verify_authorizer_reply,
3960        .invalidate_authorizer = invalidate_authorizer,
3961        .peer_reset = peer_reset,
3962        .alloc_msg = mds_alloc_msg,
3963        .sign_message = mds_sign_message,
3964        .check_message_signature = mds_check_message_signature,
3965};
3966
3967/* eof */
3968