linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/fs.h>
   4#include <linux/wait.h>
   5#include <linux/slab.h>
   6#include <linux/gfp.h>
   7#include <linux/sched.h>
   8#include <linux/debugfs.h>
   9#include <linux/seq_file.h>
  10#include <linux/utsname.h>
  11#include <linux/ratelimit.h>
  12
  13#include "super.h"
  14#include "mds_client.h"
  15
  16#include <linux/ceph/ceph_features.h>
  17#include <linux/ceph/messenger.h>
  18#include <linux/ceph/decode.h>
  19#include <linux/ceph/pagelist.h>
  20#include <linux/ceph/auth.h>
  21#include <linux/ceph/debugfs.h>
  22
  23/*
  24 * A cluster of MDS (metadata server) daemons is responsible for
  25 * managing the file system namespace (the directory hierarchy and
  26 * inodes) and for coordinating shared access to storage.  Metadata is
  27 * partitioning hierarchically across a number of servers, and that
  28 * partition varies over time as the cluster adjusts the distribution
  29 * in order to balance load.
  30 *
  31 * The MDS client is primarily responsible to managing synchronous
  32 * metadata requests for operations like open, unlink, and so forth.
  33 * If there is a MDS failure, we find out about it when we (possibly
  34 * request and) receive a new MDS map, and can resubmit affected
  35 * requests.
  36 *
  37 * For the most part, though, we take advantage of a lossless
  38 * communications channel to the MDS, and do not need to worry about
  39 * timing out or resubmitting requests.
  40 *
  41 * We maintain a stateful "session" with each MDS we interact with.
  42 * Within each session, we sent periodic heartbeat messages to ensure
  43 * any capabilities or leases we have been issues remain valid.  If
  44 * the session times out and goes stale, our leases and capabilities
  45 * are no longer valid.
  46 */
  47
  48struct ceph_reconnect_state {
  49        int nr_caps;
  50        struct ceph_pagelist *pagelist;
  51        bool flock;
  52};
  53
  54static void __wake_requests(struct ceph_mds_client *mdsc,
  55                            struct list_head *head);
  56
  57static const struct ceph_connection_operations mds_con_ops;
  58
  59
  60/*
  61 * mds reply parsing
  62 */
  63
  64/*
  65 * parse individual inode info
  66 */
  67static int parse_reply_info_in(void **p, void *end,
  68                               struct ceph_mds_reply_info_in *info,
  69                               u64 features)
  70{
  71        int err = -EIO;
  72
  73        info->in = *p;
  74        *p += sizeof(struct ceph_mds_reply_inode) +
  75                sizeof(*info->in->fragtree.splits) *
  76                le32_to_cpu(info->in->fragtree.nsplits);
  77
  78        ceph_decode_32_safe(p, end, info->symlink_len, bad);
  79        ceph_decode_need(p, end, info->symlink_len, bad);
  80        info->symlink = *p;
  81        *p += info->symlink_len;
  82
  83        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  84                ceph_decode_copy_safe(p, end, &info->dir_layout,
  85                                      sizeof(info->dir_layout), bad);
  86        else
  87                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  88
  89        ceph_decode_32_safe(p, end, info->xattr_len, bad);
  90        ceph_decode_need(p, end, info->xattr_len, bad);
  91        info->xattr_data = *p;
  92        *p += info->xattr_len;
  93
  94        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
  95                ceph_decode_64_safe(p, end, info->inline_version, bad);
  96                ceph_decode_32_safe(p, end, info->inline_len, bad);
  97                ceph_decode_need(p, end, info->inline_len, bad);
  98                info->inline_data = *p;
  99                *p += info->inline_len;
 100        } else
 101                info->inline_version = CEPH_INLINE_NONE;
 102
 103        return 0;
 104bad:
 105        return err;
 106}
 107
 108/*
 109 * parse a normal reply, which may contain a (dir+)dentry and/or a
 110 * target inode.
 111 */
 112static int parse_reply_info_trace(void **p, void *end,
 113                                  struct ceph_mds_reply_info_parsed *info,
 114                                  u64 features)
 115{
 116        int err;
 117
 118        if (info->head->is_dentry) {
 119                err = parse_reply_info_in(p, end, &info->diri, features);
 120                if (err < 0)
 121                        goto out_bad;
 122
 123                if (unlikely(*p + sizeof(*info->dirfrag) > end))
 124                        goto bad;
 125                info->dirfrag = *p;
 126                *p += sizeof(*info->dirfrag) +
 127                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 128                if (unlikely(*p > end))
 129                        goto bad;
 130
 131                ceph_decode_32_safe(p, end, info->dname_len, bad);
 132                ceph_decode_need(p, end, info->dname_len, bad);
 133                info->dname = *p;
 134                *p += info->dname_len;
 135                info->dlease = *p;
 136                *p += sizeof(*info->dlease);
 137        }
 138
 139        if (info->head->is_target) {
 140                err = parse_reply_info_in(p, end, &info->targeti, features);
 141                if (err < 0)
 142                        goto out_bad;
 143        }
 144
 145        if (unlikely(*p != end))
 146                goto bad;
 147        return 0;
 148
 149bad:
 150        err = -EIO;
 151out_bad:
 152        pr_err("problem parsing mds trace %d\n", err);
 153        return err;
 154}
 155
 156/*
 157 * parse readdir results
 158 */
 159static int parse_reply_info_dir(void **p, void *end,
 160                                struct ceph_mds_reply_info_parsed *info,
 161                                u64 features)
 162{
 163        u32 num, i = 0;
 164        int err;
 165
 166        info->dir_dir = *p;
 167        if (*p + sizeof(*info->dir_dir) > end)
 168                goto bad;
 169        *p += sizeof(*info->dir_dir) +
 170                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 171        if (*p > end)
 172                goto bad;
 173
 174        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 175        num = ceph_decode_32(p);
 176        info->dir_end = ceph_decode_8(p);
 177        info->dir_complete = ceph_decode_8(p);
 178        if (num == 0)
 179                goto done;
 180
 181        BUG_ON(!info->dir_in);
 182        info->dir_dname = (void *)(info->dir_in + num);
 183        info->dir_dname_len = (void *)(info->dir_dname + num);
 184        info->dir_dlease = (void *)(info->dir_dname_len + num);
 185        if ((unsigned long)(info->dir_dlease + num) >
 186            (unsigned long)info->dir_in + info->dir_buf_size) {
 187                pr_err("dir contents are larger than expected\n");
 188                WARN_ON(1);
 189                goto bad;
 190        }
 191
 192        info->dir_nr = num;
 193        while (num) {
 194                /* dentry */
 195                ceph_decode_need(p, end, sizeof(u32)*2, bad);
 196                info->dir_dname_len[i] = ceph_decode_32(p);
 197                ceph_decode_need(p, end, info->dir_dname_len[i], bad);
 198                info->dir_dname[i] = *p;
 199                *p += info->dir_dname_len[i];
 200                dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
 201                     info->dir_dname[i]);
 202                info->dir_dlease[i] = *p;
 203                *p += sizeof(struct ceph_mds_reply_lease);
 204
 205                /* inode */
 206                err = parse_reply_info_in(p, end, &info->dir_in[i], features);
 207                if (err < 0)
 208                        goto out_bad;
 209                i++;
 210                num--;
 211        }
 212
 213done:
 214        if (*p != end)
 215                goto bad;
 216        return 0;
 217
 218bad:
 219        err = -EIO;
 220out_bad:
 221        pr_err("problem parsing dir contents %d\n", err);
 222        return err;
 223}
 224
 225/*
 226 * parse fcntl F_GETLK results
 227 */
 228static int parse_reply_info_filelock(void **p, void *end,
 229                                     struct ceph_mds_reply_info_parsed *info,
 230                                     u64 features)
 231{
 232        if (*p + sizeof(*info->filelock_reply) > end)
 233                goto bad;
 234
 235        info->filelock_reply = *p;
 236        *p += sizeof(*info->filelock_reply);
 237
 238        if (unlikely(*p != end))
 239                goto bad;
 240        return 0;
 241
 242bad:
 243        return -EIO;
 244}
 245
 246/*
 247 * parse create results
 248 */
 249static int parse_reply_info_create(void **p, void *end,
 250                                  struct ceph_mds_reply_info_parsed *info,
 251                                  u64 features)
 252{
 253        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
 254                if (*p == end) {
 255                        info->has_create_ino = false;
 256                } else {
 257                        info->has_create_ino = true;
 258                        info->ino = ceph_decode_64(p);
 259                }
 260        }
 261
 262        if (unlikely(*p != end))
 263                goto bad;
 264        return 0;
 265
 266bad:
 267        return -EIO;
 268}
 269
 270/*
 271 * parse extra results
 272 */
 273static int parse_reply_info_extra(void **p, void *end,
 274                                  struct ceph_mds_reply_info_parsed *info,
 275                                  u64 features)
 276{
 277        if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 278                return parse_reply_info_filelock(p, end, info, features);
 279        else if (info->head->op == CEPH_MDS_OP_READDIR ||
 280                 info->head->op == CEPH_MDS_OP_LSSNAP)
 281                return parse_reply_info_dir(p, end, info, features);
 282        else if (info->head->op == CEPH_MDS_OP_CREATE)
 283                return parse_reply_info_create(p, end, info, features);
 284        else
 285                return -EIO;
 286}
 287
 288/*
 289 * parse entire mds reply
 290 */
 291static int parse_reply_info(struct ceph_msg *msg,
 292                            struct ceph_mds_reply_info_parsed *info,
 293                            u64 features)
 294{
 295        void *p, *end;
 296        u32 len;
 297        int err;
 298
 299        info->head = msg->front.iov_base;
 300        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 301        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 302
 303        /* trace */
 304        ceph_decode_32_safe(&p, end, len, bad);
 305        if (len > 0) {
 306                ceph_decode_need(&p, end, len, bad);
 307                err = parse_reply_info_trace(&p, p+len, info, features);
 308                if (err < 0)
 309                        goto out_bad;
 310        }
 311
 312        /* extra */
 313        ceph_decode_32_safe(&p, end, len, bad);
 314        if (len > 0) {
 315                ceph_decode_need(&p, end, len, bad);
 316                err = parse_reply_info_extra(&p, p+len, info, features);
 317                if (err < 0)
 318                        goto out_bad;
 319        }
 320
 321        /* snap blob */
 322        ceph_decode_32_safe(&p, end, len, bad);
 323        info->snapblob_len = len;
 324        info->snapblob = p;
 325        p += len;
 326
 327        if (p != end)
 328                goto bad;
 329        return 0;
 330
 331bad:
 332        err = -EIO;
 333out_bad:
 334        pr_err("mds parse_reply err %d\n", err);
 335        return err;
 336}
 337
 338static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 339{
 340        if (!info->dir_in)
 341                return;
 342        free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
 343}
 344
 345
 346/*
 347 * sessions
 348 */
 349const char *ceph_session_state_name(int s)
 350{
 351        switch (s) {
 352        case CEPH_MDS_SESSION_NEW: return "new";
 353        case CEPH_MDS_SESSION_OPENING: return "opening";
 354        case CEPH_MDS_SESSION_OPEN: return "open";
 355        case CEPH_MDS_SESSION_HUNG: return "hung";
 356        case CEPH_MDS_SESSION_CLOSING: return "closing";
 357        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 358        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 359        default: return "???";
 360        }
 361}
 362
 363static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 364{
 365        if (atomic_inc_not_zero(&s->s_ref)) {
 366                dout("mdsc get_session %p %d -> %d\n", s,
 367                     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
 368                return s;
 369        } else {
 370                dout("mdsc get_session %p 0 -- FAIL", s);
 371                return NULL;
 372        }
 373}
 374
 375void ceph_put_mds_session(struct ceph_mds_session *s)
 376{
 377        dout("mdsc put_session %p %d -> %d\n", s,
 378             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 379        if (atomic_dec_and_test(&s->s_ref)) {
 380                if (s->s_auth.authorizer)
 381                        ceph_auth_destroy_authorizer(
 382                                s->s_mdsc->fsc->client->monc.auth,
 383                                s->s_auth.authorizer);
 384                kfree(s);
 385        }
 386}
 387
 388/*
 389 * called under mdsc->mutex
 390 */
 391struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 392                                                   int mds)
 393{
 394        struct ceph_mds_session *session;
 395
 396        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
 397                return NULL;
 398        session = mdsc->sessions[mds];
 399        dout("lookup_mds_session %p %d\n", session,
 400             atomic_read(&session->s_ref));
 401        get_session(session);
 402        return session;
 403}
 404
 405static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 406{
 407        if (mds >= mdsc->max_sessions)
 408                return false;
 409        return mdsc->sessions[mds];
 410}
 411
 412static int __verify_registered_session(struct ceph_mds_client *mdsc,
 413                                       struct ceph_mds_session *s)
 414{
 415        if (s->s_mds >= mdsc->max_sessions ||
 416            mdsc->sessions[s->s_mds] != s)
 417                return -ENOENT;
 418        return 0;
 419}
 420
 421/*
 422 * create+register a new session for given mds.
 423 * called under mdsc->mutex.
 424 */
 425static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 426                                                 int mds)
 427{
 428        struct ceph_mds_session *s;
 429
 430        if (mds >= mdsc->mdsmap->m_max_mds)
 431                return ERR_PTR(-EINVAL);
 432
 433        s = kzalloc(sizeof(*s), GFP_NOFS);
 434        if (!s)
 435                return ERR_PTR(-ENOMEM);
 436        s->s_mdsc = mdsc;
 437        s->s_mds = mds;
 438        s->s_state = CEPH_MDS_SESSION_NEW;
 439        s->s_ttl = 0;
 440        s->s_seq = 0;
 441        mutex_init(&s->s_mutex);
 442
 443        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 444
 445        spin_lock_init(&s->s_gen_ttl_lock);
 446        s->s_cap_gen = 0;
 447        s->s_cap_ttl = jiffies - 1;
 448
 449        spin_lock_init(&s->s_cap_lock);
 450        s->s_renew_requested = 0;
 451        s->s_renew_seq = 0;
 452        INIT_LIST_HEAD(&s->s_caps);
 453        s->s_nr_caps = 0;
 454        s->s_trim_caps = 0;
 455        atomic_set(&s->s_ref, 1);
 456        INIT_LIST_HEAD(&s->s_waiting);
 457        INIT_LIST_HEAD(&s->s_unsafe);
 458        s->s_num_cap_releases = 0;
 459        s->s_cap_reconnect = 0;
 460        s->s_cap_iterator = NULL;
 461        INIT_LIST_HEAD(&s->s_cap_releases);
 462        INIT_LIST_HEAD(&s->s_cap_flushing);
 463        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 464
 465        dout("register_session mds%d\n", mds);
 466        if (mds >= mdsc->max_sessions) {
 467                int newmax = 1 << get_count_order(mds+1);
 468                struct ceph_mds_session **sa;
 469
 470                dout("register_session realloc to %d\n", newmax);
 471                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 472                if (sa == NULL)
 473                        goto fail_realloc;
 474                if (mdsc->sessions) {
 475                        memcpy(sa, mdsc->sessions,
 476                               mdsc->max_sessions * sizeof(void *));
 477                        kfree(mdsc->sessions);
 478                }
 479                mdsc->sessions = sa;
 480                mdsc->max_sessions = newmax;
 481        }
 482        mdsc->sessions[mds] = s;
 483        atomic_inc(&mdsc->num_sessions);
 484        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 485
 486        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 487                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 488
 489        return s;
 490
 491fail_realloc:
 492        kfree(s);
 493        return ERR_PTR(-ENOMEM);
 494}
 495
 496/*
 497 * called under mdsc->mutex
 498 */
 499static void __unregister_session(struct ceph_mds_client *mdsc,
 500                               struct ceph_mds_session *s)
 501{
 502        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 503        BUG_ON(mdsc->sessions[s->s_mds] != s);
 504        mdsc->sessions[s->s_mds] = NULL;
 505        ceph_con_close(&s->s_con);
 506        ceph_put_mds_session(s);
 507        atomic_dec(&mdsc->num_sessions);
 508}
 509
 510/*
 511 * drop session refs in request.
 512 *
 513 * should be last request ref, or hold mdsc->mutex
 514 */
 515static void put_request_session(struct ceph_mds_request *req)
 516{
 517        if (req->r_session) {
 518                ceph_put_mds_session(req->r_session);
 519                req->r_session = NULL;
 520        }
 521}
 522
 523void ceph_mdsc_release_request(struct kref *kref)
 524{
 525        struct ceph_mds_request *req = container_of(kref,
 526                                                    struct ceph_mds_request,
 527                                                    r_kref);
 528        destroy_reply_info(&req->r_reply_info);
 529        if (req->r_request)
 530                ceph_msg_put(req->r_request);
 531        if (req->r_reply)
 532                ceph_msg_put(req->r_reply);
 533        if (req->r_inode) {
 534                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 535                iput(req->r_inode);
 536        }
 537        if (req->r_locked_dir)
 538                ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 539        iput(req->r_target_inode);
 540        if (req->r_dentry)
 541                dput(req->r_dentry);
 542        if (req->r_old_dentry)
 543                dput(req->r_old_dentry);
 544        if (req->r_old_dentry_dir) {
 545                /*
 546                 * track (and drop pins for) r_old_dentry_dir
 547                 * separately, since r_old_dentry's d_parent may have
 548                 * changed between the dir mutex being dropped and
 549                 * this request being freed.
 550                 */
 551                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 552                                  CEPH_CAP_PIN);
 553                iput(req->r_old_dentry_dir);
 554        }
 555        kfree(req->r_path1);
 556        kfree(req->r_path2);
 557        if (req->r_pagelist)
 558                ceph_pagelist_release(req->r_pagelist);
 559        put_request_session(req);
 560        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 561        kfree(req);
 562}
 563
 564/*
 565 * lookup session, bump ref if found.
 566 *
 567 * called under mdsc->mutex.
 568 */
 569static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
 570                                             u64 tid)
 571{
 572        struct ceph_mds_request *req;
 573        struct rb_node *n = mdsc->request_tree.rb_node;
 574
 575        while (n) {
 576                req = rb_entry(n, struct ceph_mds_request, r_node);
 577                if (tid < req->r_tid)
 578                        n = n->rb_left;
 579                else if (tid > req->r_tid)
 580                        n = n->rb_right;
 581                else {
 582                        ceph_mdsc_get_request(req);
 583                        return req;
 584                }
 585        }
 586        return NULL;
 587}
 588
 589static void __insert_request(struct ceph_mds_client *mdsc,
 590                             struct ceph_mds_request *new)
 591{
 592        struct rb_node **p = &mdsc->request_tree.rb_node;
 593        struct rb_node *parent = NULL;
 594        struct ceph_mds_request *req = NULL;
 595
 596        while (*p) {
 597                parent = *p;
 598                req = rb_entry(parent, struct ceph_mds_request, r_node);
 599                if (new->r_tid < req->r_tid)
 600                        p = &(*p)->rb_left;
 601                else if (new->r_tid > req->r_tid)
 602                        p = &(*p)->rb_right;
 603                else
 604                        BUG();
 605        }
 606
 607        rb_link_node(&new->r_node, parent, p);
 608        rb_insert_color(&new->r_node, &mdsc->request_tree);
 609}
 610
 611/*
 612 * Register an in-flight request, and assign a tid.  Link to directory
 613 * are modifying (if any).
 614 *
 615 * Called under mdsc->mutex.
 616 */
 617static void __register_request(struct ceph_mds_client *mdsc,
 618                               struct ceph_mds_request *req,
 619                               struct inode *dir)
 620{
 621        req->r_tid = ++mdsc->last_tid;
 622        if (req->r_num_caps)
 623                ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 624                                  req->r_num_caps);
 625        dout("__register_request %p tid %lld\n", req, req->r_tid);
 626        ceph_mdsc_get_request(req);
 627        __insert_request(mdsc, req);
 628
 629        req->r_uid = current_fsuid();
 630        req->r_gid = current_fsgid();
 631
 632        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 633                mdsc->oldest_tid = req->r_tid;
 634
 635        if (dir) {
 636                struct ceph_inode_info *ci = ceph_inode(dir);
 637
 638                ihold(dir);
 639                spin_lock(&ci->i_unsafe_lock);
 640                req->r_unsafe_dir = dir;
 641                list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
 642                spin_unlock(&ci->i_unsafe_lock);
 643        }
 644}
 645
 646static void __unregister_request(struct ceph_mds_client *mdsc,
 647                                 struct ceph_mds_request *req)
 648{
 649        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 650
 651        if (req->r_tid == mdsc->oldest_tid) {
 652                struct rb_node *p = rb_next(&req->r_node);
 653                mdsc->oldest_tid = 0;
 654                while (p) {
 655                        struct ceph_mds_request *next_req =
 656                                rb_entry(p, struct ceph_mds_request, r_node);
 657                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 658                                mdsc->oldest_tid = next_req->r_tid;
 659                                break;
 660                        }
 661                        p = rb_next(p);
 662                }
 663        }
 664
 665        rb_erase(&req->r_node, &mdsc->request_tree);
 666        RB_CLEAR_NODE(&req->r_node);
 667
 668        if (req->r_unsafe_dir) {
 669                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 670
 671                spin_lock(&ci->i_unsafe_lock);
 672                list_del_init(&req->r_unsafe_dir_item);
 673                spin_unlock(&ci->i_unsafe_lock);
 674
 675                iput(req->r_unsafe_dir);
 676                req->r_unsafe_dir = NULL;
 677        }
 678
 679        complete_all(&req->r_safe_completion);
 680
 681        ceph_mdsc_put_request(req);
 682}
 683
 684/*
 685 * Choose mds to send request to next.  If there is a hint set in the
 686 * request (e.g., due to a prior forward hint from the mds), use that.
 687 * Otherwise, consult frag tree and/or caps to identify the
 688 * appropriate mds.  If all else fails, choose randomly.
 689 *
 690 * Called under mdsc->mutex.
 691 */
 692static struct dentry *get_nonsnap_parent(struct dentry *dentry)
 693{
 694        /*
 695         * we don't need to worry about protecting the d_parent access
 696         * here because we never renaming inside the snapped namespace
 697         * except to resplice to another snapdir, and either the old or new
 698         * result is a valid result.
 699         */
 700        while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
 701                dentry = dentry->d_parent;
 702        return dentry;
 703}
 704
 705static int __choose_mds(struct ceph_mds_client *mdsc,
 706                        struct ceph_mds_request *req)
 707{
 708        struct inode *inode;
 709        struct ceph_inode_info *ci;
 710        struct ceph_cap *cap;
 711        int mode = req->r_direct_mode;
 712        int mds = -1;
 713        u32 hash = req->r_direct_hash;
 714        bool is_hash = req->r_direct_is_hash;
 715
 716        /*
 717         * is there a specific mds we should try?  ignore hint if we have
 718         * no session and the mds is not up (active or recovering).
 719         */
 720        if (req->r_resend_mds >= 0 &&
 721            (__have_session(mdsc, req->r_resend_mds) ||
 722             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 723                dout("choose_mds using resend_mds mds%d\n",
 724                     req->r_resend_mds);
 725                return req->r_resend_mds;
 726        }
 727
 728        if (mode == USE_RANDOM_MDS)
 729                goto random;
 730
 731        inode = NULL;
 732        if (req->r_inode) {
 733                inode = req->r_inode;
 734        } else if (req->r_dentry) {
 735                /* ignore race with rename; old or new d_parent is okay */
 736                struct dentry *parent = req->r_dentry->d_parent;
 737                struct inode *dir = d_inode(parent);
 738
 739                if (dir->i_sb != mdsc->fsc->sb) {
 740                        /* not this fs! */
 741                        inode = d_inode(req->r_dentry);
 742                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
 743                        /* direct snapped/virtual snapdir requests
 744                         * based on parent dir inode */
 745                        struct dentry *dn = get_nonsnap_parent(parent);
 746                        inode = d_inode(dn);
 747                        dout("__choose_mds using nonsnap parent %p\n", inode);
 748                } else {
 749                        /* dentry target */
 750                        inode = d_inode(req->r_dentry);
 751                        if (!inode || mode == USE_AUTH_MDS) {
 752                                /* dir + name */
 753                                inode = dir;
 754                                hash = ceph_dentry_hash(dir, req->r_dentry);
 755                                is_hash = true;
 756                        }
 757                }
 758        }
 759
 760        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 761             (int)hash, mode);
 762        if (!inode)
 763                goto random;
 764        ci = ceph_inode(inode);
 765
 766        if (is_hash && S_ISDIR(inode->i_mode)) {
 767                struct ceph_inode_frag frag;
 768                int found;
 769
 770                ceph_choose_frag(ci, hash, &frag, &found);
 771                if (found) {
 772                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
 773                                u8 r;
 774
 775                                /* choose a random replica */
 776                                get_random_bytes(&r, 1);
 777                                r %= frag.ndist;
 778                                mds = frag.dist[r];
 779                                dout("choose_mds %p %llx.%llx "
 780                                     "frag %u mds%d (%d/%d)\n",
 781                                     inode, ceph_vinop(inode),
 782                                     frag.frag, mds,
 783                                     (int)r, frag.ndist);
 784                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 785                                    CEPH_MDS_STATE_ACTIVE)
 786                                        return mds;
 787                        }
 788
 789                        /* since this file/dir wasn't known to be
 790                         * replicated, then we want to look for the
 791                         * authoritative mds. */
 792                        mode = USE_AUTH_MDS;
 793                        if (frag.mds >= 0) {
 794                                /* choose auth mds */
 795                                mds = frag.mds;
 796                                dout("choose_mds %p %llx.%llx "
 797                                     "frag %u mds%d (auth)\n",
 798                                     inode, ceph_vinop(inode), frag.frag, mds);
 799                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 800                                    CEPH_MDS_STATE_ACTIVE)
 801                                        return mds;
 802                        }
 803                }
 804        }
 805
 806        spin_lock(&ci->i_ceph_lock);
 807        cap = NULL;
 808        if (mode == USE_AUTH_MDS)
 809                cap = ci->i_auth_cap;
 810        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 811                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 812        if (!cap) {
 813                spin_unlock(&ci->i_ceph_lock);
 814                goto random;
 815        }
 816        mds = cap->session->s_mds;
 817        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 818             inode, ceph_vinop(inode), mds,
 819             cap == ci->i_auth_cap ? "auth " : "", cap);
 820        spin_unlock(&ci->i_ceph_lock);
 821        return mds;
 822
 823random:
 824        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 825        dout("choose_mds chose random mds%d\n", mds);
 826        return mds;
 827}
 828
 829
 830/*
 831 * session messages
 832 */
 833static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 834{
 835        struct ceph_msg *msg;
 836        struct ceph_mds_session_head *h;
 837
 838        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 839                           false);
 840        if (!msg) {
 841                pr_err("create_session_msg ENOMEM creating msg\n");
 842                return NULL;
 843        }
 844        h = msg->front.iov_base;
 845        h->op = cpu_to_le32(op);
 846        h->seq = cpu_to_le64(seq);
 847
 848        return msg;
 849}
 850
 851/*
 852 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
 853 * to include additional client metadata fields.
 854 */
 855static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
 856{
 857        struct ceph_msg *msg;
 858        struct ceph_mds_session_head *h;
 859        int i = -1;
 860        int metadata_bytes = 0;
 861        int metadata_key_count = 0;
 862        struct ceph_options *opt = mdsc->fsc->client->options;
 863        void *p;
 864
 865        const char* metadata[][2] = {
 866                {"hostname", utsname()->nodename},
 867                {"kernel_version", utsname()->release},
 868                {"entity_id", opt->name ? opt->name : ""},
 869                {NULL, NULL}
 870        };
 871
 872        /* Calculate serialized length of metadata */
 873        metadata_bytes = 4;  /* map length */
 874        for (i = 0; metadata[i][0] != NULL; ++i) {
 875                metadata_bytes += 8 + strlen(metadata[i][0]) +
 876                        strlen(metadata[i][1]);
 877                metadata_key_count++;
 878        }
 879
 880        /* Allocate the message */
 881        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
 882                           GFP_NOFS, false);
 883        if (!msg) {
 884                pr_err("create_session_msg ENOMEM creating msg\n");
 885                return NULL;
 886        }
 887        h = msg->front.iov_base;
 888        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
 889        h->seq = cpu_to_le64(seq);
 890
 891        /*
 892         * Serialize client metadata into waiting buffer space, using
 893         * the format that userspace expects for map<string, string>
 894         *
 895         * ClientSession messages with metadata are v2
 896         */
 897        msg->hdr.version = cpu_to_le16(2);
 898        msg->hdr.compat_version = cpu_to_le16(1);
 899
 900        /* The write pointer, following the session_head structure */
 901        p = msg->front.iov_base + sizeof(*h);
 902
 903        /* Number of entries in the map */
 904        ceph_encode_32(&p, metadata_key_count);
 905
 906        /* Two length-prefixed strings for each entry in the map */
 907        for (i = 0; metadata[i][0] != NULL; ++i) {
 908                size_t const key_len = strlen(metadata[i][0]);
 909                size_t const val_len = strlen(metadata[i][1]);
 910
 911                ceph_encode_32(&p, key_len);
 912                memcpy(p, metadata[i][0], key_len);
 913                p += key_len;
 914                ceph_encode_32(&p, val_len);
 915                memcpy(p, metadata[i][1], val_len);
 916                p += val_len;
 917        }
 918
 919        return msg;
 920}
 921
 922/*
 923 * send session open request.
 924 *
 925 * called under mdsc->mutex
 926 */
 927static int __open_session(struct ceph_mds_client *mdsc,
 928                          struct ceph_mds_session *session)
 929{
 930        struct ceph_msg *msg;
 931        int mstate;
 932        int mds = session->s_mds;
 933
 934        /* wait for mds to go active? */
 935        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 936        dout("open_session to mds%d (%s)\n", mds,
 937             ceph_mds_state_name(mstate));
 938        session->s_state = CEPH_MDS_SESSION_OPENING;
 939        session->s_renew_requested = jiffies;
 940
 941        /* send connect message */
 942        msg = create_session_open_msg(mdsc, session->s_seq);
 943        if (!msg)
 944                return -ENOMEM;
 945        ceph_con_send(&session->s_con, msg);
 946        return 0;
 947}
 948
 949/*
 950 * open sessions for any export targets for the given mds
 951 *
 952 * called under mdsc->mutex
 953 */
 954static struct ceph_mds_session *
 955__open_export_target_session(struct ceph_mds_client *mdsc, int target)
 956{
 957        struct ceph_mds_session *session;
 958
 959        session = __ceph_lookup_mds_session(mdsc, target);
 960        if (!session) {
 961                session = register_session(mdsc, target);
 962                if (IS_ERR(session))
 963                        return session;
 964        }
 965        if (session->s_state == CEPH_MDS_SESSION_NEW ||
 966            session->s_state == CEPH_MDS_SESSION_CLOSING)
 967                __open_session(mdsc, session);
 968
 969        return session;
 970}
 971
 972struct ceph_mds_session *
 973ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
 974{
 975        struct ceph_mds_session *session;
 976
 977        dout("open_export_target_session to mds%d\n", target);
 978
 979        mutex_lock(&mdsc->mutex);
 980        session = __open_export_target_session(mdsc, target);
 981        mutex_unlock(&mdsc->mutex);
 982
 983        return session;
 984}
 985
 986static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 987                                          struct ceph_mds_session *session)
 988{
 989        struct ceph_mds_info *mi;
 990        struct ceph_mds_session *ts;
 991        int i, mds = session->s_mds;
 992
 993        if (mds >= mdsc->mdsmap->m_max_mds)
 994                return;
 995
 996        mi = &mdsc->mdsmap->m_info[mds];
 997        dout("open_export_target_sessions for mds%d (%d targets)\n",
 998             session->s_mds, mi->num_export_targets);
 999
1000        for (i = 0; i < mi->num_export_targets; i++) {
1001                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1002                if (!IS_ERR(ts))
1003                        ceph_put_mds_session(ts);
1004        }
1005}
1006
1007void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1008                                           struct ceph_mds_session *session)
1009{
1010        mutex_lock(&mdsc->mutex);
1011        __open_export_target_sessions(mdsc, session);
1012        mutex_unlock(&mdsc->mutex);
1013}
1014
1015/*
1016 * session caps
1017 */
1018
1019/* caller holds s_cap_lock, we drop it */
1020static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1021                                 struct ceph_mds_session *session)
1022        __releases(session->s_cap_lock)
1023{
1024        LIST_HEAD(tmp_list);
1025        list_splice_init(&session->s_cap_releases, &tmp_list);
1026        session->s_num_cap_releases = 0;
1027        spin_unlock(&session->s_cap_lock);
1028
1029        dout("cleanup_cap_releases mds%d\n", session->s_mds);
1030        while (!list_empty(&tmp_list)) {
1031                struct ceph_cap *cap;
1032                /* zero out the in-progress message */
1033                cap = list_first_entry(&tmp_list,
1034                                        struct ceph_cap, session_caps);
1035                list_del(&cap->session_caps);
1036                ceph_put_cap(mdsc, cap);
1037        }
1038}
1039
1040static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1041                                     struct ceph_mds_session *session)
1042{
1043        struct ceph_mds_request *req;
1044        struct rb_node *p;
1045
1046        dout("cleanup_session_requests mds%d\n", session->s_mds);
1047        mutex_lock(&mdsc->mutex);
1048        while (!list_empty(&session->s_unsafe)) {
1049                req = list_first_entry(&session->s_unsafe,
1050                                       struct ceph_mds_request, r_unsafe_item);
1051                list_del_init(&req->r_unsafe_item);
1052                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1053                                    req->r_tid);
1054                __unregister_request(mdsc, req);
1055        }
1056        /* zero r_attempts, so kick_requests() will re-send requests */
1057        p = rb_first(&mdsc->request_tree);
1058        while (p) {
1059                req = rb_entry(p, struct ceph_mds_request, r_node);
1060                p = rb_next(p);
1061                if (req->r_session &&
1062                    req->r_session->s_mds == session->s_mds)
1063                        req->r_attempts = 0;
1064        }
1065        mutex_unlock(&mdsc->mutex);
1066}
1067
1068/*
1069 * Helper to safely iterate over all caps associated with a session, with
1070 * special care taken to handle a racing __ceph_remove_cap().
1071 *
1072 * Caller must hold session s_mutex.
1073 */
1074static int iterate_session_caps(struct ceph_mds_session *session,
1075                                 int (*cb)(struct inode *, struct ceph_cap *,
1076                                            void *), void *arg)
1077{
1078        struct list_head *p;
1079        struct ceph_cap *cap;
1080        struct inode *inode, *last_inode = NULL;
1081        struct ceph_cap *old_cap = NULL;
1082        int ret;
1083
1084        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1085        spin_lock(&session->s_cap_lock);
1086        p = session->s_caps.next;
1087        while (p != &session->s_caps) {
1088                cap = list_entry(p, struct ceph_cap, session_caps);
1089                inode = igrab(&cap->ci->vfs_inode);
1090                if (!inode) {
1091                        p = p->next;
1092                        continue;
1093                }
1094                session->s_cap_iterator = cap;
1095                spin_unlock(&session->s_cap_lock);
1096
1097                if (last_inode) {
1098                        iput(last_inode);
1099                        last_inode = NULL;
1100                }
1101                if (old_cap) {
1102                        ceph_put_cap(session->s_mdsc, old_cap);
1103                        old_cap = NULL;
1104                }
1105
1106                ret = cb(inode, cap, arg);
1107                last_inode = inode;
1108
1109                spin_lock(&session->s_cap_lock);
1110                p = p->next;
1111                if (cap->ci == NULL) {
1112                        dout("iterate_session_caps  finishing cap %p removal\n",
1113                             cap);
1114                        BUG_ON(cap->session != session);
1115                        cap->session = NULL;
1116                        list_del_init(&cap->session_caps);
1117                        session->s_nr_caps--;
1118                        if (cap->queue_release) {
1119                                list_add_tail(&cap->session_caps,
1120                                              &session->s_cap_releases);
1121                                session->s_num_cap_releases++;
1122                        } else {
1123                                old_cap = cap;  /* put_cap it w/o locks held */
1124                        }
1125                }
1126                if (ret < 0)
1127                        goto out;
1128        }
1129        ret = 0;
1130out:
1131        session->s_cap_iterator = NULL;
1132        spin_unlock(&session->s_cap_lock);
1133
1134        iput(last_inode);
1135        if (old_cap)
1136                ceph_put_cap(session->s_mdsc, old_cap);
1137
1138        return ret;
1139}
1140
1141static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1142                                  void *arg)
1143{
1144        struct ceph_inode_info *ci = ceph_inode(inode);
1145        LIST_HEAD(to_remove);
1146        int drop = 0;
1147
1148        dout("removing cap %p, ci is %p, inode is %p\n",
1149             cap, ci, &ci->vfs_inode);
1150        spin_lock(&ci->i_ceph_lock);
1151        __ceph_remove_cap(cap, false);
1152        if (!ci->i_auth_cap) {
1153                struct ceph_cap_flush *cf;
1154                struct ceph_mds_client *mdsc =
1155                        ceph_sb_to_client(inode->i_sb)->mdsc;
1156
1157                while (true) {
1158                        struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1159                        if (!n)
1160                                break;
1161                        cf = rb_entry(n, struct ceph_cap_flush, i_node);
1162                        rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1163                        list_add(&cf->list, &to_remove);
1164                }
1165
1166                spin_lock(&mdsc->cap_dirty_lock);
1167
1168                list_for_each_entry(cf, &to_remove, list)
1169                        rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
1170
1171                if (!list_empty(&ci->i_dirty_item)) {
1172                        pr_warn_ratelimited(
1173                                " dropping dirty %s state for %p %lld\n",
1174                                ceph_cap_string(ci->i_dirty_caps),
1175                                inode, ceph_ino(inode));
1176                        ci->i_dirty_caps = 0;
1177                        list_del_init(&ci->i_dirty_item);
1178                        drop = 1;
1179                }
1180                if (!list_empty(&ci->i_flushing_item)) {
1181                        pr_warn_ratelimited(
1182                                " dropping dirty+flushing %s state for %p %lld\n",
1183                                ceph_cap_string(ci->i_flushing_caps),
1184                                inode, ceph_ino(inode));
1185                        ci->i_flushing_caps = 0;
1186                        list_del_init(&ci->i_flushing_item);
1187                        mdsc->num_cap_flushing--;
1188                        drop = 1;
1189                }
1190                spin_unlock(&mdsc->cap_dirty_lock);
1191
1192                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1193                        list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
1194                        ci->i_prealloc_cap_flush = NULL;
1195                }
1196        }
1197        spin_unlock(&ci->i_ceph_lock);
1198        while (!list_empty(&to_remove)) {
1199                struct ceph_cap_flush *cf;
1200                cf = list_first_entry(&to_remove,
1201                                      struct ceph_cap_flush, list);
1202                list_del(&cf->list);
1203                ceph_free_cap_flush(cf);
1204        }
1205        while (drop--)
1206                iput(inode);
1207        return 0;
1208}
1209
1210/*
1211 * caller must hold session s_mutex
1212 */
1213static void remove_session_caps(struct ceph_mds_session *session)
1214{
1215        dout("remove_session_caps on %p\n", session);
1216        iterate_session_caps(session, remove_session_caps_cb, NULL);
1217
1218        spin_lock(&session->s_cap_lock);
1219        if (session->s_nr_caps > 0) {
1220                struct super_block *sb = session->s_mdsc->fsc->sb;
1221                struct inode *inode;
1222                struct ceph_cap *cap, *prev = NULL;
1223                struct ceph_vino vino;
1224                /*
1225                 * iterate_session_caps() skips inodes that are being
1226                 * deleted, we need to wait until deletions are complete.
1227                 * __wait_on_freeing_inode() is designed for the job,
1228                 * but it is not exported, so use lookup inode function
1229                 * to access it.
1230                 */
1231                while (!list_empty(&session->s_caps)) {
1232                        cap = list_entry(session->s_caps.next,
1233                                         struct ceph_cap, session_caps);
1234                        if (cap == prev)
1235                                break;
1236                        prev = cap;
1237                        vino = cap->ci->i_vino;
1238                        spin_unlock(&session->s_cap_lock);
1239
1240                        inode = ceph_find_inode(sb, vino);
1241                        iput(inode);
1242
1243                        spin_lock(&session->s_cap_lock);
1244                }
1245        }
1246
1247        // drop cap expires and unlock s_cap_lock
1248        cleanup_cap_releases(session->s_mdsc, session);
1249
1250        BUG_ON(session->s_nr_caps > 0);
1251        BUG_ON(!list_empty(&session->s_cap_flushing));
1252}
1253
1254/*
1255 * wake up any threads waiting on this session's caps.  if the cap is
1256 * old (didn't get renewed on the client reconnect), remove it now.
1257 *
1258 * caller must hold s_mutex.
1259 */
1260static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1261                              void *arg)
1262{
1263        struct ceph_inode_info *ci = ceph_inode(inode);
1264
1265        wake_up_all(&ci->i_cap_wq);
1266        if (arg) {
1267                spin_lock(&ci->i_ceph_lock);
1268                ci->i_wanted_max_size = 0;
1269                ci->i_requested_max_size = 0;
1270                spin_unlock(&ci->i_ceph_lock);
1271        }
1272        return 0;
1273}
1274
1275static void wake_up_session_caps(struct ceph_mds_session *session,
1276                                 int reconnect)
1277{
1278        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1279        iterate_session_caps(session, wake_up_session_cb,
1280                             (void *)(unsigned long)reconnect);
1281}
1282
1283/*
1284 * Send periodic message to MDS renewing all currently held caps.  The
1285 * ack will reset the expiration for all caps from this session.
1286 *
1287 * caller holds s_mutex
1288 */
1289static int send_renew_caps(struct ceph_mds_client *mdsc,
1290                           struct ceph_mds_session *session)
1291{
1292        struct ceph_msg *msg;
1293        int state;
1294
1295        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1296            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1297                pr_info("mds%d caps stale\n", session->s_mds);
1298        session->s_renew_requested = jiffies;
1299
1300        /* do not try to renew caps until a recovering mds has reconnected
1301         * with its clients. */
1302        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1303        if (state < CEPH_MDS_STATE_RECONNECT) {
1304                dout("send_renew_caps ignoring mds%d (%s)\n",
1305                     session->s_mds, ceph_mds_state_name(state));
1306                return 0;
1307        }
1308
1309        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1310                ceph_mds_state_name(state));
1311        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1312                                 ++session->s_renew_seq);
1313        if (!msg)
1314                return -ENOMEM;
1315        ceph_con_send(&session->s_con, msg);
1316        return 0;
1317}
1318
1319static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1320                             struct ceph_mds_session *session, u64 seq)
1321{
1322        struct ceph_msg *msg;
1323
1324        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1325             session->s_mds, ceph_session_state_name(session->s_state), seq);
1326        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1327        if (!msg)
1328                return -ENOMEM;
1329        ceph_con_send(&session->s_con, msg);
1330        return 0;
1331}
1332
1333
1334/*
1335 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1336 *
1337 * Called under session->s_mutex
1338 */
1339static void renewed_caps(struct ceph_mds_client *mdsc,
1340                         struct ceph_mds_session *session, int is_renew)
1341{
1342        int was_stale;
1343        int wake = 0;
1344
1345        spin_lock(&session->s_cap_lock);
1346        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1347
1348        session->s_cap_ttl = session->s_renew_requested +
1349                mdsc->mdsmap->m_session_timeout*HZ;
1350
1351        if (was_stale) {
1352                if (time_before(jiffies, session->s_cap_ttl)) {
1353                        pr_info("mds%d caps renewed\n", session->s_mds);
1354                        wake = 1;
1355                } else {
1356                        pr_info("mds%d caps still stale\n", session->s_mds);
1357                }
1358        }
1359        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1360             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1361             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1362        spin_unlock(&session->s_cap_lock);
1363
1364        if (wake)
1365                wake_up_session_caps(session, 0);
1366}
1367
1368/*
1369 * send a session close request
1370 */
1371static int request_close_session(struct ceph_mds_client *mdsc,
1372                                 struct ceph_mds_session *session)
1373{
1374        struct ceph_msg *msg;
1375
1376        dout("request_close_session mds%d state %s seq %lld\n",
1377             session->s_mds, ceph_session_state_name(session->s_state),
1378             session->s_seq);
1379        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1380        if (!msg)
1381                return -ENOMEM;
1382        ceph_con_send(&session->s_con, msg);
1383        return 0;
1384}
1385
1386/*
1387 * Called with s_mutex held.
1388 */
1389static int __close_session(struct ceph_mds_client *mdsc,
1390                         struct ceph_mds_session *session)
1391{
1392        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1393                return 0;
1394        session->s_state = CEPH_MDS_SESSION_CLOSING;
1395        return request_close_session(mdsc, session);
1396}
1397
1398/*
1399 * Trim old(er) caps.
1400 *
1401 * Because we can't cache an inode without one or more caps, we do
1402 * this indirectly: if a cap is unused, we prune its aliases, at which
1403 * point the inode will hopefully get dropped to.
1404 *
1405 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1406 * memory pressure from the MDS, though, so it needn't be perfect.
1407 */
1408static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1409{
1410        struct ceph_mds_session *session = arg;
1411        struct ceph_inode_info *ci = ceph_inode(inode);
1412        int used, wanted, oissued, mine;
1413
1414        if (session->s_trim_caps <= 0)
1415                return -1;
1416
1417        spin_lock(&ci->i_ceph_lock);
1418        mine = cap->issued | cap->implemented;
1419        used = __ceph_caps_used(ci);
1420        wanted = __ceph_caps_file_wanted(ci);
1421        oissued = __ceph_caps_issued_other(ci, cap);
1422
1423        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1424             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1425             ceph_cap_string(used), ceph_cap_string(wanted));
1426        if (cap == ci->i_auth_cap) {
1427                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1428                    !list_empty(&ci->i_cap_snaps))
1429                        goto out;
1430                if ((used | wanted) & CEPH_CAP_ANY_WR)
1431                        goto out;
1432        }
1433        if ((used | wanted) & ~oissued & mine)
1434                goto out;   /* we need these caps */
1435
1436        session->s_trim_caps--;
1437        if (oissued) {
1438                /* we aren't the only cap.. just remove us */
1439                __ceph_remove_cap(cap, true);
1440        } else {
1441                /* try to drop referring dentries */
1442                spin_unlock(&ci->i_ceph_lock);
1443                d_prune_aliases(inode);
1444                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1445                     inode, cap, atomic_read(&inode->i_count));
1446                return 0;
1447        }
1448
1449out:
1450        spin_unlock(&ci->i_ceph_lock);
1451        return 0;
1452}
1453
1454/*
1455 * Trim session cap count down to some max number.
1456 */
1457static int trim_caps(struct ceph_mds_client *mdsc,
1458                     struct ceph_mds_session *session,
1459                     int max_caps)
1460{
1461        int trim_caps = session->s_nr_caps - max_caps;
1462
1463        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1464             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1465        if (trim_caps > 0) {
1466                session->s_trim_caps = trim_caps;
1467                iterate_session_caps(session, trim_caps_cb, session);
1468                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1469                     session->s_mds, session->s_nr_caps, max_caps,
1470                        trim_caps - session->s_trim_caps);
1471                session->s_trim_caps = 0;
1472        }
1473
1474        ceph_send_cap_releases(mdsc, session);
1475        return 0;
1476}
1477
1478static int check_capsnap_flush(struct ceph_inode_info *ci,
1479                               u64 want_snap_seq)
1480{
1481        int ret = 1;
1482        spin_lock(&ci->i_ceph_lock);
1483        if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
1484                struct ceph_cap_snap *capsnap =
1485                        list_first_entry(&ci->i_cap_snaps,
1486                                         struct ceph_cap_snap, ci_item);
1487                ret = capsnap->follows >= want_snap_seq;
1488        }
1489        spin_unlock(&ci->i_ceph_lock);
1490        return ret;
1491}
1492
1493static int check_caps_flush(struct ceph_mds_client *mdsc,
1494                            u64 want_flush_tid)
1495{
1496        struct rb_node *n;
1497        struct ceph_cap_flush *cf;
1498        int ret = 1;
1499
1500        spin_lock(&mdsc->cap_dirty_lock);
1501        n = rb_first(&mdsc->cap_flush_tree);
1502        cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
1503        if (cf && cf->tid <= want_flush_tid) {
1504                dout("check_caps_flush still flushing tid %llu <= %llu\n",
1505                     cf->tid, want_flush_tid);
1506                ret = 0;
1507        }
1508        spin_unlock(&mdsc->cap_dirty_lock);
1509        return ret;
1510}
1511
1512/*
1513 * flush all dirty inode data to disk.
1514 *
1515 * returns true if we've flushed through want_flush_tid
1516 */
1517static void wait_caps_flush(struct ceph_mds_client *mdsc,
1518                            u64 want_flush_tid, u64 want_snap_seq)
1519{
1520        int mds;
1521
1522        dout("check_caps_flush want %llu snap want %llu\n",
1523             want_flush_tid, want_snap_seq);
1524        mutex_lock(&mdsc->mutex);
1525        for (mds = 0; mds < mdsc->max_sessions; ) {
1526                struct ceph_mds_session *session = mdsc->sessions[mds];
1527                struct inode *inode = NULL;
1528
1529                if (!session) {
1530                        mds++;
1531                        continue;
1532                }
1533                get_session(session);
1534                mutex_unlock(&mdsc->mutex);
1535
1536                mutex_lock(&session->s_mutex);
1537                if (!list_empty(&session->s_cap_snaps_flushing)) {
1538                        struct ceph_cap_snap *capsnap =
1539                                list_first_entry(&session->s_cap_snaps_flushing,
1540                                                 struct ceph_cap_snap,
1541                                                 flushing_item);
1542                        struct ceph_inode_info *ci = capsnap->ci;
1543                        if (!check_capsnap_flush(ci, want_snap_seq)) {
1544                                dout("check_cap_flush still flushing snap %p "
1545                                     "follows %lld <= %lld to mds%d\n",
1546                                     &ci->vfs_inode, capsnap->follows,
1547                                     want_snap_seq, mds);
1548                                inode = igrab(&ci->vfs_inode);
1549                        }
1550                }
1551                mutex_unlock(&session->s_mutex);
1552                ceph_put_mds_session(session);
1553
1554                if (inode) {
1555                        wait_event(mdsc->cap_flushing_wq,
1556                                   check_capsnap_flush(ceph_inode(inode),
1557                                                       want_snap_seq));
1558                        iput(inode);
1559                } else {
1560                        mds++;
1561                }
1562
1563                mutex_lock(&mdsc->mutex);
1564        }
1565        mutex_unlock(&mdsc->mutex);
1566
1567        wait_event(mdsc->cap_flushing_wq,
1568                   check_caps_flush(mdsc, want_flush_tid));
1569
1570        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1571}
1572
1573/*
1574 * called under s_mutex
1575 */
1576void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1577                            struct ceph_mds_session *session)
1578{
1579        struct ceph_msg *msg = NULL;
1580        struct ceph_mds_cap_release *head;
1581        struct ceph_mds_cap_item *item;
1582        struct ceph_cap *cap;
1583        LIST_HEAD(tmp_list);
1584        int num_cap_releases;
1585
1586        spin_lock(&session->s_cap_lock);
1587again:
1588        list_splice_init(&session->s_cap_releases, &tmp_list);
1589        num_cap_releases = session->s_num_cap_releases;
1590        session->s_num_cap_releases = 0;
1591        spin_unlock(&session->s_cap_lock);
1592
1593        while (!list_empty(&tmp_list)) {
1594                if (!msg) {
1595                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1596                                        PAGE_CACHE_SIZE, GFP_NOFS, false);
1597                        if (!msg)
1598                                goto out_err;
1599                        head = msg->front.iov_base;
1600                        head->num = cpu_to_le32(0);
1601                        msg->front.iov_len = sizeof(*head);
1602                }
1603                cap = list_first_entry(&tmp_list, struct ceph_cap,
1604                                        session_caps);
1605                list_del(&cap->session_caps);
1606                num_cap_releases--;
1607
1608                head = msg->front.iov_base;
1609                le32_add_cpu(&head->num, 1);
1610                item = msg->front.iov_base + msg->front.iov_len;
1611                item->ino = cpu_to_le64(cap->cap_ino);
1612                item->cap_id = cpu_to_le64(cap->cap_id);
1613                item->migrate_seq = cpu_to_le32(cap->mseq);
1614                item->seq = cpu_to_le32(cap->issue_seq);
1615                msg->front.iov_len += sizeof(*item);
1616
1617                ceph_put_cap(mdsc, cap);
1618
1619                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1620                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1621                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1622                        ceph_con_send(&session->s_con, msg);
1623                        msg = NULL;
1624                }
1625        }
1626
1627        BUG_ON(num_cap_releases != 0);
1628
1629        spin_lock(&session->s_cap_lock);
1630        if (!list_empty(&session->s_cap_releases))
1631                goto again;
1632        spin_unlock(&session->s_cap_lock);
1633
1634        if (msg) {
1635                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1636                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1637                ceph_con_send(&session->s_con, msg);
1638        }
1639        return;
1640out_err:
1641        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1642                session->s_mds);
1643        spin_lock(&session->s_cap_lock);
1644        list_splice(&tmp_list, &session->s_cap_releases);
1645        session->s_num_cap_releases += num_cap_releases;
1646        spin_unlock(&session->s_cap_lock);
1647}
1648
1649/*
1650 * requests
1651 */
1652
1653int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1654                                    struct inode *dir)
1655{
1656        struct ceph_inode_info *ci = ceph_inode(dir);
1657        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1658        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1659        size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1660                      sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1661        int order, num_entries;
1662
1663        spin_lock(&ci->i_ceph_lock);
1664        num_entries = ci->i_files + ci->i_subdirs;
1665        spin_unlock(&ci->i_ceph_lock);
1666        num_entries = max(num_entries, 1);
1667        num_entries = min(num_entries, opt->max_readdir);
1668
1669        order = get_order(size * num_entries);
1670        while (order >= 0) {
1671                rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
1672                                                        __GFP_NOWARN,
1673                                                        order);
1674                if (rinfo->dir_in)
1675                        break;
1676                order--;
1677        }
1678        if (!rinfo->dir_in)
1679                return -ENOMEM;
1680
1681        num_entries = (PAGE_SIZE << order) / size;
1682        num_entries = min(num_entries, opt->max_readdir);
1683
1684        rinfo->dir_buf_size = PAGE_SIZE << order;
1685        req->r_num_caps = num_entries + 1;
1686        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1687        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1688        return 0;
1689}
1690
1691/*
1692 * Create an mds request.
1693 */
1694struct ceph_mds_request *
1695ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1696{
1697        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1698
1699        if (!req)
1700                return ERR_PTR(-ENOMEM);
1701
1702        mutex_init(&req->r_fill_mutex);
1703        req->r_mdsc = mdsc;
1704        req->r_started = jiffies;
1705        req->r_resend_mds = -1;
1706        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1707        req->r_fmode = -1;
1708        kref_init(&req->r_kref);
1709        INIT_LIST_HEAD(&req->r_wait);
1710        init_completion(&req->r_completion);
1711        init_completion(&req->r_safe_completion);
1712        INIT_LIST_HEAD(&req->r_unsafe_item);
1713
1714        req->r_stamp = CURRENT_TIME;
1715
1716        req->r_op = op;
1717        req->r_direct_mode = mode;
1718        return req;
1719}
1720
1721/*
1722 * return oldest (lowest) request, tid in request tree, 0 if none.
1723 *
1724 * called under mdsc->mutex.
1725 */
1726static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1727{
1728        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1729                return NULL;
1730        return rb_entry(rb_first(&mdsc->request_tree),
1731                        struct ceph_mds_request, r_node);
1732}
1733
1734static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1735{
1736        return mdsc->oldest_tid;
1737}
1738
1739/*
1740 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1741 * on build_path_from_dentry in fs/cifs/dir.c.
1742 *
1743 * If @stop_on_nosnap, generate path relative to the first non-snapped
1744 * inode.
1745 *
1746 * Encode hidden .snap dirs as a double /, i.e.
1747 *   foo/.snap/bar -> foo//bar
1748 */
1749char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1750                           int stop_on_nosnap)
1751{
1752        struct dentry *temp;
1753        char *path;
1754        int len, pos;
1755        unsigned seq;
1756
1757        if (dentry == NULL)
1758                return ERR_PTR(-EINVAL);
1759
1760retry:
1761        len = 0;
1762        seq = read_seqbegin(&rename_lock);
1763        rcu_read_lock();
1764        for (temp = dentry; !IS_ROOT(temp);) {
1765                struct inode *inode = d_inode(temp);
1766                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1767                        len++;  /* slash only */
1768                else if (stop_on_nosnap && inode &&
1769                         ceph_snap(inode) == CEPH_NOSNAP)
1770                        break;
1771                else
1772                        len += 1 + temp->d_name.len;
1773                temp = temp->d_parent;
1774        }
1775        rcu_read_unlock();
1776        if (len)
1777                len--;  /* no leading '/' */
1778
1779        path = kmalloc(len+1, GFP_NOFS);
1780        if (path == NULL)
1781                return ERR_PTR(-ENOMEM);
1782        pos = len;
1783        path[pos] = 0;  /* trailing null */
1784        rcu_read_lock();
1785        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1786                struct inode *inode;
1787
1788                spin_lock(&temp->d_lock);
1789                inode = d_inode(temp);
1790                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1791                        dout("build_path path+%d: %p SNAPDIR\n",
1792                             pos, temp);
1793                } else if (stop_on_nosnap && inode &&
1794                           ceph_snap(inode) == CEPH_NOSNAP) {
1795                        spin_unlock(&temp->d_lock);
1796                        break;
1797                } else {
1798                        pos -= temp->d_name.len;
1799                        if (pos < 0) {
1800                                spin_unlock(&temp->d_lock);
1801                                break;
1802                        }
1803                        strncpy(path + pos, temp->d_name.name,
1804                                temp->d_name.len);
1805                }
1806                spin_unlock(&temp->d_lock);
1807                if (pos)
1808                        path[--pos] = '/';
1809                temp = temp->d_parent;
1810        }
1811        rcu_read_unlock();
1812        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1813                pr_err("build_path did not end path lookup where "
1814                       "expected, namelen is %d, pos is %d\n", len, pos);
1815                /* presumably this is only possible if racing with a
1816                   rename of one of the parent directories (we can not
1817                   lock the dentries above us to prevent this, but
1818                   retrying should be harmless) */
1819                kfree(path);
1820                goto retry;
1821        }
1822
1823        *base = ceph_ino(d_inode(temp));
1824        *plen = len;
1825        dout("build_path on %p %d built %llx '%.*s'\n",
1826             dentry, d_count(dentry), *base, len, path);
1827        return path;
1828}
1829
1830static int build_dentry_path(struct dentry *dentry,
1831                             const char **ppath, int *ppathlen, u64 *pino,
1832                             int *pfreepath)
1833{
1834        char *path;
1835
1836        if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
1837                *pino = ceph_ino(d_inode(dentry->d_parent));
1838                *ppath = dentry->d_name.name;
1839                *ppathlen = dentry->d_name.len;
1840                return 0;
1841        }
1842        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1843        if (IS_ERR(path))
1844                return PTR_ERR(path);
1845        *ppath = path;
1846        *pfreepath = 1;
1847        return 0;
1848}
1849
1850static int build_inode_path(struct inode *inode,
1851                            const char **ppath, int *ppathlen, u64 *pino,
1852                            int *pfreepath)
1853{
1854        struct dentry *dentry;
1855        char *path;
1856
1857        if (ceph_snap(inode) == CEPH_NOSNAP) {
1858                *pino = ceph_ino(inode);
1859                *ppathlen = 0;
1860                return 0;
1861        }
1862        dentry = d_find_alias(inode);
1863        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1864        dput(dentry);
1865        if (IS_ERR(path))
1866                return PTR_ERR(path);
1867        *ppath = path;
1868        *pfreepath = 1;
1869        return 0;
1870}
1871
1872/*
1873 * request arguments may be specified via an inode *, a dentry *, or
1874 * an explicit ino+path.
1875 */
1876static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1877                                  const char *rpath, u64 rino,
1878                                  const char **ppath, int *pathlen,
1879                                  u64 *ino, int *freepath)
1880{
1881        int r = 0;
1882
1883        if (rinode) {
1884                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1885                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1886                     ceph_snap(rinode));
1887        } else if (rdentry) {
1888                r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1889                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1890                     *ppath);
1891        } else if (rpath || rino) {
1892                *ino = rino;
1893                *ppath = rpath;
1894                *pathlen = rpath ? strlen(rpath) : 0;
1895                dout(" path %.*s\n", *pathlen, rpath);
1896        }
1897
1898        return r;
1899}
1900
1901/*
1902 * called under mdsc->mutex
1903 */
1904static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1905                                               struct ceph_mds_request *req,
1906                                               int mds, bool drop_cap_releases)
1907{
1908        struct ceph_msg *msg;
1909        struct ceph_mds_request_head *head;
1910        const char *path1 = NULL;
1911        const char *path2 = NULL;
1912        u64 ino1 = 0, ino2 = 0;
1913        int pathlen1 = 0, pathlen2 = 0;
1914        int freepath1 = 0, freepath2 = 0;
1915        int len;
1916        u16 releases;
1917        void *p, *end;
1918        int ret;
1919
1920        ret = set_request_path_attr(req->r_inode, req->r_dentry,
1921                              req->r_path1, req->r_ino1.ino,
1922                              &path1, &pathlen1, &ino1, &freepath1);
1923        if (ret < 0) {
1924                msg = ERR_PTR(ret);
1925                goto out;
1926        }
1927
1928        ret = set_request_path_attr(NULL, req->r_old_dentry,
1929                              req->r_path2, req->r_ino2.ino,
1930                              &path2, &pathlen2, &ino2, &freepath2);
1931        if (ret < 0) {
1932                msg = ERR_PTR(ret);
1933                goto out_free1;
1934        }
1935
1936        len = sizeof(*head) +
1937                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1938                sizeof(struct timespec);
1939
1940        /* calculate (max) length for cap releases */
1941        len += sizeof(struct ceph_mds_request_release) *
1942                (!!req->r_inode_drop + !!req->r_dentry_drop +
1943                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1944        if (req->r_dentry_drop)
1945                len += req->r_dentry->d_name.len;
1946        if (req->r_old_dentry_drop)
1947                len += req->r_old_dentry->d_name.len;
1948
1949        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1950        if (!msg) {
1951                msg = ERR_PTR(-ENOMEM);
1952                goto out_free2;
1953        }
1954
1955        msg->hdr.version = cpu_to_le16(2);
1956        msg->hdr.tid = cpu_to_le64(req->r_tid);
1957
1958        head = msg->front.iov_base;
1959        p = msg->front.iov_base + sizeof(*head);
1960        end = msg->front.iov_base + msg->front.iov_len;
1961
1962        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1963        head->op = cpu_to_le32(req->r_op);
1964        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1965        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1966        head->args = req->r_args;
1967
1968        ceph_encode_filepath(&p, end, ino1, path1);
1969        ceph_encode_filepath(&p, end, ino2, path2);
1970
1971        /* make note of release offset, in case we need to replay */
1972        req->r_request_release_offset = p - msg->front.iov_base;
1973
1974        /* cap releases */
1975        releases = 0;
1976        if (req->r_inode_drop)
1977                releases += ceph_encode_inode_release(&p,
1978                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
1979                      mds, req->r_inode_drop, req->r_inode_unless, 0);
1980        if (req->r_dentry_drop)
1981                releases += ceph_encode_dentry_release(&p, req->r_dentry,
1982                       mds, req->r_dentry_drop, req->r_dentry_unless);
1983        if (req->r_old_dentry_drop)
1984                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1985                       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1986        if (req->r_old_inode_drop)
1987                releases += ceph_encode_inode_release(&p,
1988                      d_inode(req->r_old_dentry),
1989                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1990
1991        if (drop_cap_releases) {
1992                releases = 0;
1993                p = msg->front.iov_base + req->r_request_release_offset;
1994        }
1995
1996        head->num_releases = cpu_to_le16(releases);
1997
1998        /* time stamp */
1999        {
2000                struct ceph_timespec ts;
2001                ceph_encode_timespec(&ts, &req->r_stamp);
2002                ceph_encode_copy(&p, &ts, sizeof(ts));
2003        }
2004
2005        BUG_ON(p > end);
2006        msg->front.iov_len = p - msg->front.iov_base;
2007        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2008
2009        if (req->r_pagelist) {
2010                struct ceph_pagelist *pagelist = req->r_pagelist;
2011                atomic_inc(&pagelist->refcnt);
2012                ceph_msg_data_add_pagelist(msg, pagelist);
2013                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2014        } else {
2015                msg->hdr.data_len = 0;
2016        }
2017
2018        msg->hdr.data_off = cpu_to_le16(0);
2019
2020out_free2:
2021        if (freepath2)
2022                kfree((char *)path2);
2023out_free1:
2024        if (freepath1)
2025                kfree((char *)path1);
2026out:
2027        return msg;
2028}
2029
2030/*
2031 * called under mdsc->mutex if error, under no mutex if
2032 * success.
2033 */
2034static void complete_request(struct ceph_mds_client *mdsc,
2035                             struct ceph_mds_request *req)
2036{
2037        if (req->r_callback)
2038                req->r_callback(mdsc, req);
2039        else
2040                complete_all(&req->r_completion);
2041}
2042
2043/*
2044 * called under mdsc->mutex
2045 */
2046static int __prepare_send_request(struct ceph_mds_client *mdsc,
2047                                  struct ceph_mds_request *req,
2048                                  int mds, bool drop_cap_releases)
2049{
2050        struct ceph_mds_request_head *rhead;
2051        struct ceph_msg *msg;
2052        int flags = 0;
2053
2054        req->r_attempts++;
2055        if (req->r_inode) {
2056                struct ceph_cap *cap =
2057                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2058
2059                if (cap)
2060                        req->r_sent_on_mseq = cap->mseq;
2061                else
2062                        req->r_sent_on_mseq = -1;
2063        }
2064        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2065             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2066
2067        if (req->r_got_unsafe) {
2068                void *p;
2069                /*
2070                 * Replay.  Do not regenerate message (and rebuild
2071                 * paths, etc.); just use the original message.
2072                 * Rebuilding paths will break for renames because
2073                 * d_move mangles the src name.
2074                 */
2075                msg = req->r_request;
2076                rhead = msg->front.iov_base;
2077
2078                flags = le32_to_cpu(rhead->flags);
2079                flags |= CEPH_MDS_FLAG_REPLAY;
2080                rhead->flags = cpu_to_le32(flags);
2081
2082                if (req->r_target_inode)
2083                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2084
2085                rhead->num_retry = req->r_attempts - 1;
2086
2087                /* remove cap/dentry releases from message */
2088                rhead->num_releases = 0;
2089
2090                /* time stamp */
2091                p = msg->front.iov_base + req->r_request_release_offset;
2092                {
2093                        struct ceph_timespec ts;
2094                        ceph_encode_timespec(&ts, &req->r_stamp);
2095                        ceph_encode_copy(&p, &ts, sizeof(ts));
2096                }
2097
2098                msg->front.iov_len = p - msg->front.iov_base;
2099                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2100                return 0;
2101        }
2102
2103        if (req->r_request) {
2104                ceph_msg_put(req->r_request);
2105                req->r_request = NULL;
2106        }
2107        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2108        if (IS_ERR(msg)) {
2109                req->r_err = PTR_ERR(msg);
2110                return PTR_ERR(msg);
2111        }
2112        req->r_request = msg;
2113
2114        rhead = msg->front.iov_base;
2115        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2116        if (req->r_got_unsafe)
2117                flags |= CEPH_MDS_FLAG_REPLAY;
2118        if (req->r_locked_dir)
2119                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2120        rhead->flags = cpu_to_le32(flags);
2121        rhead->num_fwd = req->r_num_fwd;
2122        rhead->num_retry = req->r_attempts - 1;
2123        rhead->ino = 0;
2124
2125        dout(" r_locked_dir = %p\n", req->r_locked_dir);
2126        return 0;
2127}
2128
2129/*
2130 * send request, or put it on the appropriate wait list.
2131 */
2132static int __do_request(struct ceph_mds_client *mdsc,
2133                        struct ceph_mds_request *req)
2134{
2135        struct ceph_mds_session *session = NULL;
2136        int mds = -1;
2137        int err = 0;
2138
2139        if (req->r_err || req->r_got_result) {
2140                if (req->r_aborted)
2141                        __unregister_request(mdsc, req);
2142                goto out;
2143        }
2144
2145        if (req->r_timeout &&
2146            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2147                dout("do_request timed out\n");
2148                err = -EIO;
2149                goto finish;
2150        }
2151        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2152                dout("do_request forced umount\n");
2153                err = -EIO;
2154                goto finish;
2155        }
2156
2157        put_request_session(req);
2158
2159        mds = __choose_mds(mdsc, req);
2160        if (mds < 0 ||
2161            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2162                dout("do_request no mds or not active, waiting for map\n");
2163                list_add(&req->r_wait, &mdsc->waiting_for_map);
2164                goto out;
2165        }
2166
2167        /* get, open session */
2168        session = __ceph_lookup_mds_session(mdsc, mds);
2169        if (!session) {
2170                session = register_session(mdsc, mds);
2171                if (IS_ERR(session)) {
2172                        err = PTR_ERR(session);
2173                        goto finish;
2174                }
2175        }
2176        req->r_session = get_session(session);
2177
2178        dout("do_request mds%d session %p state %s\n", mds, session,
2179             ceph_session_state_name(session->s_state));
2180        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2181            session->s_state != CEPH_MDS_SESSION_HUNG) {
2182                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2183                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2184                        __open_session(mdsc, session);
2185                list_add(&req->r_wait, &session->s_waiting);
2186                goto out_session;
2187        }
2188
2189        /* send request */
2190        req->r_resend_mds = -1;   /* forget any previous mds hint */
2191
2192        if (req->r_request_started == 0)   /* note request start time */
2193                req->r_request_started = jiffies;
2194
2195        err = __prepare_send_request(mdsc, req, mds, false);
2196        if (!err) {
2197                ceph_msg_get(req->r_request);
2198                ceph_con_send(&session->s_con, req->r_request);
2199        }
2200
2201out_session:
2202        ceph_put_mds_session(session);
2203finish:
2204        if (err) {
2205                dout("__do_request early error %d\n", err);
2206                req->r_err = err;
2207                complete_request(mdsc, req);
2208                __unregister_request(mdsc, req);
2209        }
2210out:
2211        return err;
2212}
2213
2214/*
2215 * called under mdsc->mutex
2216 */
2217static void __wake_requests(struct ceph_mds_client *mdsc,
2218                            struct list_head *head)
2219{
2220        struct ceph_mds_request *req;
2221        LIST_HEAD(tmp_list);
2222
2223        list_splice_init(head, &tmp_list);
2224
2225        while (!list_empty(&tmp_list)) {
2226                req = list_entry(tmp_list.next,
2227                                 struct ceph_mds_request, r_wait);
2228                list_del_init(&req->r_wait);
2229                dout(" wake request %p tid %llu\n", req, req->r_tid);
2230                __do_request(mdsc, req);
2231        }
2232}
2233
2234/*
2235 * Wake up threads with requests pending for @mds, so that they can
2236 * resubmit their requests to a possibly different mds.
2237 */
2238static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2239{
2240        struct ceph_mds_request *req;
2241        struct rb_node *p = rb_first(&mdsc->request_tree);
2242
2243        dout("kick_requests mds%d\n", mds);
2244        while (p) {
2245                req = rb_entry(p, struct ceph_mds_request, r_node);
2246                p = rb_next(p);
2247                if (req->r_got_unsafe)
2248                        continue;
2249                if (req->r_attempts > 0)
2250                        continue; /* only new requests */
2251                if (req->r_session &&
2252                    req->r_session->s_mds == mds) {
2253                        dout(" kicking tid %llu\n", req->r_tid);
2254                        list_del_init(&req->r_wait);
2255                        __do_request(mdsc, req);
2256                }
2257        }
2258}
2259
2260void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2261                              struct ceph_mds_request *req)
2262{
2263        dout("submit_request on %p\n", req);
2264        mutex_lock(&mdsc->mutex);
2265        __register_request(mdsc, req, NULL);
2266        __do_request(mdsc, req);
2267        mutex_unlock(&mdsc->mutex);
2268}
2269
2270/*
2271 * Synchrously perform an mds request.  Take care of all of the
2272 * session setup, forwarding, retry details.
2273 */
2274int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2275                         struct inode *dir,
2276                         struct ceph_mds_request *req)
2277{
2278        int err;
2279
2280        dout("do_request on %p\n", req);
2281
2282        /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2283        if (req->r_inode)
2284                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2285        if (req->r_locked_dir)
2286                ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2287        if (req->r_old_dentry_dir)
2288                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2289                                  CEPH_CAP_PIN);
2290
2291        /* issue */
2292        mutex_lock(&mdsc->mutex);
2293        __register_request(mdsc, req, dir);
2294        __do_request(mdsc, req);
2295
2296        if (req->r_err) {
2297                err = req->r_err;
2298                goto out;
2299        }
2300
2301        /* wait */
2302        mutex_unlock(&mdsc->mutex);
2303        dout("do_request waiting\n");
2304        if (!req->r_timeout && req->r_wait_for_completion) {
2305                err = req->r_wait_for_completion(mdsc, req);
2306        } else {
2307                long timeleft = wait_for_completion_killable_timeout(
2308                                        &req->r_completion,
2309                                        ceph_timeout_jiffies(req->r_timeout));
2310                if (timeleft > 0)
2311                        err = 0;
2312                else if (!timeleft)
2313                        err = -EIO;  /* timed out */
2314                else
2315                        err = timeleft;  /* killed */
2316        }
2317        dout("do_request waited, got %d\n", err);
2318        mutex_lock(&mdsc->mutex);
2319
2320        /* only abort if we didn't race with a real reply */
2321        if (req->r_got_result) {
2322                err = le32_to_cpu(req->r_reply_info.head->result);
2323        } else if (err < 0) {
2324                dout("aborted request %lld with %d\n", req->r_tid, err);
2325
2326                /*
2327                 * ensure we aren't running concurrently with
2328                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2329                 * rely on locks (dir mutex) held by our caller.
2330                 */
2331                mutex_lock(&req->r_fill_mutex);
2332                req->r_err = err;
2333                req->r_aborted = true;
2334                mutex_unlock(&req->r_fill_mutex);
2335
2336                if (req->r_locked_dir &&
2337                    (req->r_op & CEPH_MDS_OP_WRITE))
2338                        ceph_invalidate_dir_request(req);
2339        } else {
2340                err = req->r_err;
2341        }
2342
2343out:
2344        mutex_unlock(&mdsc->mutex);
2345        dout("do_request %p done, result %d\n", req, err);
2346        return err;
2347}
2348
2349/*
2350 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2351 * namespace request.
2352 */
2353void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2354{
2355        struct inode *inode = req->r_locked_dir;
2356
2357        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2358
2359        ceph_dir_clear_complete(inode);
2360        if (req->r_dentry)
2361                ceph_invalidate_dentry_lease(req->r_dentry);
2362        if (req->r_old_dentry)
2363                ceph_invalidate_dentry_lease(req->r_old_dentry);
2364}
2365
2366/*
2367 * Handle mds reply.
2368 *
2369 * We take the session mutex and parse and process the reply immediately.
2370 * This preserves the logical ordering of replies, capabilities, etc., sent
2371 * by the MDS as they are applied to our local cache.
2372 */
2373static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2374{
2375        struct ceph_mds_client *mdsc = session->s_mdsc;
2376        struct ceph_mds_request *req;
2377        struct ceph_mds_reply_head *head = msg->front.iov_base;
2378        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2379        struct ceph_snap_realm *realm;
2380        u64 tid;
2381        int err, result;
2382        int mds = session->s_mds;
2383
2384        if (msg->front.iov_len < sizeof(*head)) {
2385                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2386                ceph_msg_dump(msg);
2387                return;
2388        }
2389
2390        /* get request, session */
2391        tid = le64_to_cpu(msg->hdr.tid);
2392        mutex_lock(&mdsc->mutex);
2393        req = __lookup_request(mdsc, tid);
2394        if (!req) {
2395                dout("handle_reply on unknown tid %llu\n", tid);
2396                mutex_unlock(&mdsc->mutex);
2397                return;
2398        }
2399        dout("handle_reply %p\n", req);
2400
2401        /* correct session? */
2402        if (req->r_session != session) {
2403                pr_err("mdsc_handle_reply got %llu on session mds%d"
2404                       " not mds%d\n", tid, session->s_mds,
2405                       req->r_session ? req->r_session->s_mds : -1);
2406                mutex_unlock(&mdsc->mutex);
2407                goto out;
2408        }
2409
2410        /* dup? */
2411        if ((req->r_got_unsafe && !head->safe) ||
2412            (req->r_got_safe && head->safe)) {
2413                pr_warn("got a dup %s reply on %llu from mds%d\n",
2414                           head->safe ? "safe" : "unsafe", tid, mds);
2415                mutex_unlock(&mdsc->mutex);
2416                goto out;
2417        }
2418        if (req->r_got_safe) {
2419                pr_warn("got unsafe after safe on %llu from mds%d\n",
2420                           tid, mds);
2421                mutex_unlock(&mdsc->mutex);
2422                goto out;
2423        }
2424
2425        result = le32_to_cpu(head->result);
2426
2427        /*
2428         * Handle an ESTALE
2429         * if we're not talking to the authority, send to them
2430         * if the authority has changed while we weren't looking,
2431         * send to new authority
2432         * Otherwise we just have to return an ESTALE
2433         */
2434        if (result == -ESTALE) {
2435                dout("got ESTALE on request %llu", req->r_tid);
2436                req->r_resend_mds = -1;
2437                if (req->r_direct_mode != USE_AUTH_MDS) {
2438                        dout("not using auth, setting for that now");
2439                        req->r_direct_mode = USE_AUTH_MDS;
2440                        __do_request(mdsc, req);
2441                        mutex_unlock(&mdsc->mutex);
2442                        goto out;
2443                } else  {
2444                        int mds = __choose_mds(mdsc, req);
2445                        if (mds >= 0 && mds != req->r_session->s_mds) {
2446                                dout("but auth changed, so resending");
2447                                __do_request(mdsc, req);
2448                                mutex_unlock(&mdsc->mutex);
2449                                goto out;
2450                        }
2451                }
2452                dout("have to return ESTALE on request %llu", req->r_tid);
2453        }
2454
2455
2456        if (head->safe) {
2457                req->r_got_safe = true;
2458                __unregister_request(mdsc, req);
2459
2460                if (req->r_got_unsafe) {
2461                        /*
2462                         * We already handled the unsafe response, now do the
2463                         * cleanup.  No need to examine the response; the MDS
2464                         * doesn't include any result info in the safe
2465                         * response.  And even if it did, there is nothing
2466                         * useful we could do with a revised return value.
2467                         */
2468                        dout("got safe reply %llu, mds%d\n", tid, mds);
2469                        list_del_init(&req->r_unsafe_item);
2470
2471                        /* last unsafe request during umount? */
2472                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2473                                complete_all(&mdsc->safe_umount_waiters);
2474                        mutex_unlock(&mdsc->mutex);
2475                        goto out;
2476                }
2477        } else {
2478                req->r_got_unsafe = true;
2479                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2480        }
2481
2482        dout("handle_reply tid %lld result %d\n", tid, result);
2483        rinfo = &req->r_reply_info;
2484        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2485        mutex_unlock(&mdsc->mutex);
2486
2487        mutex_lock(&session->s_mutex);
2488        if (err < 0) {
2489                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2490                ceph_msg_dump(msg);
2491                goto out_err;
2492        }
2493
2494        /* snap trace */
2495        realm = NULL;
2496        if (rinfo->snapblob_len) {
2497                down_write(&mdsc->snap_rwsem);
2498                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2499                                rinfo->snapblob + rinfo->snapblob_len,
2500                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2501                                &realm);
2502                downgrade_write(&mdsc->snap_rwsem);
2503        } else {
2504                down_read(&mdsc->snap_rwsem);
2505        }
2506
2507        /* insert trace into our cache */
2508        mutex_lock(&req->r_fill_mutex);
2509        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2510        if (err == 0) {
2511                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2512                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2513                        ceph_readdir_prepopulate(req, req->r_session);
2514                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2515        }
2516        mutex_unlock(&req->r_fill_mutex);
2517
2518        up_read(&mdsc->snap_rwsem);
2519        if (realm)
2520                ceph_put_snap_realm(mdsc, realm);
2521out_err:
2522        mutex_lock(&mdsc->mutex);
2523        if (!req->r_aborted) {
2524                if (err) {
2525                        req->r_err = err;
2526                } else {
2527                        req->r_reply =  ceph_msg_get(msg);
2528                        req->r_got_result = true;
2529                }
2530        } else {
2531                dout("reply arrived after request %lld was aborted\n", tid);
2532        }
2533        mutex_unlock(&mdsc->mutex);
2534
2535        mutex_unlock(&session->s_mutex);
2536
2537        /* kick calling process */
2538        complete_request(mdsc, req);
2539out:
2540        ceph_mdsc_put_request(req);
2541        return;
2542}
2543
2544
2545
2546/*
2547 * handle mds notification that our request has been forwarded.
2548 */
2549static void handle_forward(struct ceph_mds_client *mdsc,
2550                           struct ceph_mds_session *session,
2551                           struct ceph_msg *msg)
2552{
2553        struct ceph_mds_request *req;
2554        u64 tid = le64_to_cpu(msg->hdr.tid);
2555        u32 next_mds;
2556        u32 fwd_seq;
2557        int err = -EINVAL;
2558        void *p = msg->front.iov_base;
2559        void *end = p + msg->front.iov_len;
2560
2561        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2562        next_mds = ceph_decode_32(&p);
2563        fwd_seq = ceph_decode_32(&p);
2564
2565        mutex_lock(&mdsc->mutex);
2566        req = __lookup_request(mdsc, tid);
2567        if (!req) {
2568                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2569                goto out;  /* dup reply? */
2570        }
2571
2572        if (req->r_aborted) {
2573                dout("forward tid %llu aborted, unregistering\n", tid);
2574                __unregister_request(mdsc, req);
2575        } else if (fwd_seq <= req->r_num_fwd) {
2576                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2577                     tid, next_mds, req->r_num_fwd, fwd_seq);
2578        } else {
2579                /* resend. forward race not possible; mds would drop */
2580                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2581                BUG_ON(req->r_err);
2582                BUG_ON(req->r_got_result);
2583                req->r_attempts = 0;
2584                req->r_num_fwd = fwd_seq;
2585                req->r_resend_mds = next_mds;
2586                put_request_session(req);
2587                __do_request(mdsc, req);
2588        }
2589        ceph_mdsc_put_request(req);
2590out:
2591        mutex_unlock(&mdsc->mutex);
2592        return;
2593
2594bad:
2595        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2596}
2597
2598/*
2599 * handle a mds session control message
2600 */
2601static void handle_session(struct ceph_mds_session *session,
2602                           struct ceph_msg *msg)
2603{
2604        struct ceph_mds_client *mdsc = session->s_mdsc;
2605        u32 op;
2606        u64 seq;
2607        int mds = session->s_mds;
2608        struct ceph_mds_session_head *h = msg->front.iov_base;
2609        int wake = 0;
2610
2611        /* decode */
2612        if (msg->front.iov_len != sizeof(*h))
2613                goto bad;
2614        op = le32_to_cpu(h->op);
2615        seq = le64_to_cpu(h->seq);
2616
2617        mutex_lock(&mdsc->mutex);
2618        if (op == CEPH_SESSION_CLOSE)
2619                __unregister_session(mdsc, session);
2620        /* FIXME: this ttl calculation is generous */
2621        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2622        mutex_unlock(&mdsc->mutex);
2623
2624        mutex_lock(&session->s_mutex);
2625
2626        dout("handle_session mds%d %s %p state %s seq %llu\n",
2627             mds, ceph_session_op_name(op), session,
2628             ceph_session_state_name(session->s_state), seq);
2629
2630        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2631                session->s_state = CEPH_MDS_SESSION_OPEN;
2632                pr_info("mds%d came back\n", session->s_mds);
2633        }
2634
2635        switch (op) {
2636        case CEPH_SESSION_OPEN:
2637                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2638                        pr_info("mds%d reconnect success\n", session->s_mds);
2639                session->s_state = CEPH_MDS_SESSION_OPEN;
2640                renewed_caps(mdsc, session, 0);
2641                wake = 1;
2642                if (mdsc->stopping)
2643                        __close_session(mdsc, session);
2644                break;
2645
2646        case CEPH_SESSION_RENEWCAPS:
2647                if (session->s_renew_seq == seq)
2648                        renewed_caps(mdsc, session, 1);
2649                break;
2650
2651        case CEPH_SESSION_CLOSE:
2652                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2653                        pr_info("mds%d reconnect denied\n", session->s_mds);
2654                cleanup_session_requests(mdsc, session);
2655                remove_session_caps(session);
2656                wake = 2; /* for good measure */
2657                wake_up_all(&mdsc->session_close_wq);
2658                break;
2659
2660        case CEPH_SESSION_STALE:
2661                pr_info("mds%d caps went stale, renewing\n",
2662                        session->s_mds);
2663                spin_lock(&session->s_gen_ttl_lock);
2664                session->s_cap_gen++;
2665                session->s_cap_ttl = jiffies - 1;
2666                spin_unlock(&session->s_gen_ttl_lock);
2667                send_renew_caps(mdsc, session);
2668                break;
2669
2670        case CEPH_SESSION_RECALL_STATE:
2671                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2672                break;
2673
2674        case CEPH_SESSION_FLUSHMSG:
2675                send_flushmsg_ack(mdsc, session, seq);
2676                break;
2677
2678        case CEPH_SESSION_FORCE_RO:
2679                dout("force_session_readonly %p\n", session);
2680                spin_lock(&session->s_cap_lock);
2681                session->s_readonly = true;
2682                spin_unlock(&session->s_cap_lock);
2683                wake_up_session_caps(session, 0);
2684                break;
2685
2686        default:
2687                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2688                WARN_ON(1);
2689        }
2690
2691        mutex_unlock(&session->s_mutex);
2692        if (wake) {
2693                mutex_lock(&mdsc->mutex);
2694                __wake_requests(mdsc, &session->s_waiting);
2695                if (wake == 2)
2696                        kick_requests(mdsc, mds);
2697                mutex_unlock(&mdsc->mutex);
2698        }
2699        return;
2700
2701bad:
2702        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2703               (int)msg->front.iov_len);
2704        ceph_msg_dump(msg);
2705        return;
2706}
2707
2708
2709/*
2710 * called under session->mutex.
2711 */
2712static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2713                                   struct ceph_mds_session *session)
2714{
2715        struct ceph_mds_request *req, *nreq;
2716        struct rb_node *p;
2717        int err;
2718
2719        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2720
2721        mutex_lock(&mdsc->mutex);
2722        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2723                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2724                if (!err) {
2725                        ceph_msg_get(req->r_request);
2726                        ceph_con_send(&session->s_con, req->r_request);
2727                }
2728        }
2729
2730        /*
2731         * also re-send old requests when MDS enters reconnect stage. So that MDS
2732         * can process completed request in clientreplay stage.
2733         */
2734        p = rb_first(&mdsc->request_tree);
2735        while (p) {
2736                req = rb_entry(p, struct ceph_mds_request, r_node);
2737                p = rb_next(p);
2738                if (req->r_got_unsafe)
2739                        continue;
2740                if (req->r_attempts == 0)
2741                        continue; /* only old requests */
2742                if (req->r_session &&
2743                    req->r_session->s_mds == session->s_mds) {
2744                        err = __prepare_send_request(mdsc, req,
2745                                                     session->s_mds, true);
2746                        if (!err) {
2747                                ceph_msg_get(req->r_request);
2748                                ceph_con_send(&session->s_con, req->r_request);
2749                        }
2750                }
2751        }
2752        mutex_unlock(&mdsc->mutex);
2753}
2754
2755/*
2756 * Encode information about a cap for a reconnect with the MDS.
2757 */
2758static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2759                          void *arg)
2760{
2761        union {
2762                struct ceph_mds_cap_reconnect v2;
2763                struct ceph_mds_cap_reconnect_v1 v1;
2764        } rec;
2765        size_t reclen;
2766        struct ceph_inode_info *ci;
2767        struct ceph_reconnect_state *recon_state = arg;
2768        struct ceph_pagelist *pagelist = recon_state->pagelist;
2769        char *path;
2770        int pathlen, err;
2771        u64 pathbase;
2772        struct dentry *dentry;
2773
2774        ci = cap->ci;
2775
2776        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2777             inode, ceph_vinop(inode), cap, cap->cap_id,
2778             ceph_cap_string(cap->issued));
2779        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2780        if (err)
2781                return err;
2782
2783        dentry = d_find_alias(inode);
2784        if (dentry) {
2785                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2786                if (IS_ERR(path)) {
2787                        err = PTR_ERR(path);
2788                        goto out_dput;
2789                }
2790        } else {
2791                path = NULL;
2792                pathlen = 0;
2793        }
2794        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2795        if (err)
2796                goto out_free;
2797
2798        spin_lock(&ci->i_ceph_lock);
2799        cap->seq = 0;        /* reset cap seq */
2800        cap->issue_seq = 0;  /* and issue_seq */
2801        cap->mseq = 0;       /* and migrate_seq */
2802        cap->cap_gen = cap->session->s_cap_gen;
2803
2804        if (recon_state->flock) {
2805                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2806                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2807                rec.v2.issued = cpu_to_le32(cap->issued);
2808                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2809                rec.v2.pathbase = cpu_to_le64(pathbase);
2810                rec.v2.flock_len = 0;
2811                reclen = sizeof(rec.v2);
2812        } else {
2813                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2814                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2815                rec.v1.issued = cpu_to_le32(cap->issued);
2816                rec.v1.size = cpu_to_le64(inode->i_size);
2817                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2818                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2819                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2820                rec.v1.pathbase = cpu_to_le64(pathbase);
2821                reclen = sizeof(rec.v1);
2822        }
2823        spin_unlock(&ci->i_ceph_lock);
2824
2825        if (recon_state->flock) {
2826                int num_fcntl_locks, num_flock_locks;
2827                struct ceph_filelock *flocks;
2828
2829encode_again:
2830                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2831                flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2832                                 sizeof(struct ceph_filelock), GFP_NOFS);
2833                if (!flocks) {
2834                        err = -ENOMEM;
2835                        goto out_free;
2836                }
2837                err = ceph_encode_locks_to_buffer(inode, flocks,
2838                                                  num_fcntl_locks,
2839                                                  num_flock_locks);
2840                if (err) {
2841                        kfree(flocks);
2842                        if (err == -ENOSPC)
2843                                goto encode_again;
2844                        goto out_free;
2845                }
2846                /*
2847                 * number of encoded locks is stable, so copy to pagelist
2848                 */
2849                rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2850                                    (num_fcntl_locks+num_flock_locks) *
2851                                    sizeof(struct ceph_filelock));
2852                err = ceph_pagelist_append(pagelist, &rec, reclen);
2853                if (!err)
2854                        err = ceph_locks_to_pagelist(flocks, pagelist,
2855                                                     num_fcntl_locks,
2856                                                     num_flock_locks);
2857                kfree(flocks);
2858        } else {
2859                err = ceph_pagelist_append(pagelist, &rec, reclen);
2860        }
2861
2862        recon_state->nr_caps++;
2863out_free:
2864        kfree(path);
2865out_dput:
2866        dput(dentry);
2867        return err;
2868}
2869
2870
2871/*
2872 * If an MDS fails and recovers, clients need to reconnect in order to
2873 * reestablish shared state.  This includes all caps issued through
2874 * this session _and_ the snap_realm hierarchy.  Because it's not
2875 * clear which snap realms the mds cares about, we send everything we
2876 * know about.. that ensures we'll then get any new info the
2877 * recovering MDS might have.
2878 *
2879 * This is a relatively heavyweight operation, but it's rare.
2880 *
2881 * called with mdsc->mutex held.
2882 */
2883static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2884                               struct ceph_mds_session *session)
2885{
2886        struct ceph_msg *reply;
2887        struct rb_node *p;
2888        int mds = session->s_mds;
2889        int err = -ENOMEM;
2890        int s_nr_caps;
2891        struct ceph_pagelist *pagelist;
2892        struct ceph_reconnect_state recon_state;
2893
2894        pr_info("mds%d reconnect start\n", mds);
2895
2896        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2897        if (!pagelist)
2898                goto fail_nopagelist;
2899        ceph_pagelist_init(pagelist);
2900
2901        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2902        if (!reply)
2903                goto fail_nomsg;
2904
2905        mutex_lock(&session->s_mutex);
2906        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2907        session->s_seq = 0;
2908
2909        dout("session %p state %s\n", session,
2910             ceph_session_state_name(session->s_state));
2911
2912        spin_lock(&session->s_gen_ttl_lock);
2913        session->s_cap_gen++;
2914        spin_unlock(&session->s_gen_ttl_lock);
2915
2916        spin_lock(&session->s_cap_lock);
2917        /* don't know if session is readonly */
2918        session->s_readonly = 0;
2919        /*
2920         * notify __ceph_remove_cap() that we are composing cap reconnect.
2921         * If a cap get released before being added to the cap reconnect,
2922         * __ceph_remove_cap() should skip queuing cap release.
2923         */
2924        session->s_cap_reconnect = 1;
2925        /* drop old cap expires; we're about to reestablish that state */
2926        cleanup_cap_releases(mdsc, session);
2927
2928        /* trim unused caps to reduce MDS's cache rejoin time */
2929        if (mdsc->fsc->sb->s_root)
2930                shrink_dcache_parent(mdsc->fsc->sb->s_root);
2931
2932        ceph_con_close(&session->s_con);
2933        ceph_con_open(&session->s_con,
2934                      CEPH_ENTITY_TYPE_MDS, mds,
2935                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2936
2937        /* replay unsafe requests */
2938        replay_unsafe_requests(mdsc, session);
2939
2940        down_read(&mdsc->snap_rwsem);
2941
2942        /* traverse this session's caps */
2943        s_nr_caps = session->s_nr_caps;
2944        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2945        if (err)
2946                goto fail;
2947
2948        recon_state.nr_caps = 0;
2949        recon_state.pagelist = pagelist;
2950        recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2951        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2952        if (err < 0)
2953                goto fail;
2954
2955        spin_lock(&session->s_cap_lock);
2956        session->s_cap_reconnect = 0;
2957        spin_unlock(&session->s_cap_lock);
2958
2959        /*
2960         * snaprealms.  we provide mds with the ino, seq (version), and
2961         * parent for all of our realms.  If the mds has any newer info,
2962         * it will tell us.
2963         */
2964        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2965                struct ceph_snap_realm *realm =
2966                        rb_entry(p, struct ceph_snap_realm, node);
2967                struct ceph_mds_snaprealm_reconnect sr_rec;
2968
2969                dout(" adding snap realm %llx seq %lld parent %llx\n",
2970                     realm->ino, realm->seq, realm->parent_ino);
2971                sr_rec.ino = cpu_to_le64(realm->ino);
2972                sr_rec.seq = cpu_to_le64(realm->seq);
2973                sr_rec.parent = cpu_to_le64(realm->parent_ino);
2974                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2975                if (err)
2976                        goto fail;
2977        }
2978
2979        if (recon_state.flock)
2980                reply->hdr.version = cpu_to_le16(2);
2981
2982        /* raced with cap release? */
2983        if (s_nr_caps != recon_state.nr_caps) {
2984                struct page *page = list_first_entry(&pagelist->head,
2985                                                     struct page, lru);
2986                __le32 *addr = kmap_atomic(page);
2987                *addr = cpu_to_le32(recon_state.nr_caps);
2988                kunmap_atomic(addr);
2989        }
2990
2991        reply->hdr.data_len = cpu_to_le32(pagelist->length);
2992        ceph_msg_data_add_pagelist(reply, pagelist);
2993
2994        ceph_early_kick_flushing_caps(mdsc, session);
2995
2996        ceph_con_send(&session->s_con, reply);
2997
2998        mutex_unlock(&session->s_mutex);
2999
3000        mutex_lock(&mdsc->mutex);
3001        __wake_requests(mdsc, &session->s_waiting);
3002        mutex_unlock(&mdsc->mutex);
3003
3004        up_read(&mdsc->snap_rwsem);
3005        return;
3006
3007fail:
3008        ceph_msg_put(reply);
3009        up_read(&mdsc->snap_rwsem);
3010        mutex_unlock(&session->s_mutex);
3011fail_nomsg:
3012        ceph_pagelist_release(pagelist);
3013fail_nopagelist:
3014        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3015        return;
3016}
3017
3018
3019/*
3020 * compare old and new mdsmaps, kicking requests
3021 * and closing out old connections as necessary
3022 *
3023 * called under mdsc->mutex.
3024 */
3025static void check_new_map(struct ceph_mds_client *mdsc,
3026                          struct ceph_mdsmap *newmap,
3027                          struct ceph_mdsmap *oldmap)
3028{
3029        int i;
3030        int oldstate, newstate;
3031        struct ceph_mds_session *s;
3032
3033        dout("check_new_map new %u old %u\n",
3034             newmap->m_epoch, oldmap->m_epoch);
3035
3036        for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3037                if (mdsc->sessions[i] == NULL)
3038                        continue;
3039                s = mdsc->sessions[i];
3040                oldstate = ceph_mdsmap_get_state(oldmap, i);
3041                newstate = ceph_mdsmap_get_state(newmap, i);
3042
3043                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3044                     i, ceph_mds_state_name(oldstate),
3045                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3046                     ceph_mds_state_name(newstate),
3047                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3048                     ceph_session_state_name(s->s_state));
3049
3050                if (i >= newmap->m_max_mds ||
3051                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3052                           ceph_mdsmap_get_addr(newmap, i),
3053                           sizeof(struct ceph_entity_addr))) {
3054                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3055                                /* the session never opened, just close it
3056                                 * out now */
3057                                __wake_requests(mdsc, &s->s_waiting);
3058                                __unregister_session(mdsc, s);
3059                        } else {
3060                                /* just close it */
3061                                mutex_unlock(&mdsc->mutex);
3062                                mutex_lock(&s->s_mutex);
3063                                mutex_lock(&mdsc->mutex);
3064                                ceph_con_close(&s->s_con);
3065                                mutex_unlock(&s->s_mutex);
3066                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3067                        }
3068                } else if (oldstate == newstate) {
3069                        continue;  /* nothing new with this mds */
3070                }
3071
3072                /*
3073                 * send reconnect?
3074                 */
3075                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3076                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3077                        mutex_unlock(&mdsc->mutex);
3078                        send_mds_reconnect(mdsc, s);
3079                        mutex_lock(&mdsc->mutex);
3080                }
3081
3082                /*
3083                 * kick request on any mds that has gone active.
3084                 */
3085                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3086                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3087                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3088                            oldstate != CEPH_MDS_STATE_STARTING)
3089                                pr_info("mds%d recovery completed\n", s->s_mds);
3090                        kick_requests(mdsc, i);
3091                        ceph_kick_flushing_caps(mdsc, s);
3092                        wake_up_session_caps(s, 1);
3093                }
3094        }
3095
3096        for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3097                s = mdsc->sessions[i];
3098                if (!s)
3099                        continue;
3100                if (!ceph_mdsmap_is_laggy(newmap, i))
3101                        continue;
3102                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3103                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3104                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3105                        dout(" connecting to export targets of laggy mds%d\n",
3106                             i);
3107                        __open_export_target_sessions(mdsc, s);
3108                }
3109        }
3110}
3111
3112
3113
3114/*
3115 * leases
3116 */
3117
3118/*
3119 * caller must hold session s_mutex, dentry->d_lock
3120 */
3121void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3122{
3123        struct ceph_dentry_info *di = ceph_dentry(dentry);
3124
3125        ceph_put_mds_session(di->lease_session);
3126        di->lease_session = NULL;
3127}
3128
3129static void handle_lease(struct ceph_mds_client *mdsc,
3130                         struct ceph_mds_session *session,
3131                         struct ceph_msg *msg)
3132{
3133        struct super_block *sb = mdsc->fsc->sb;
3134        struct inode *inode;
3135        struct dentry *parent, *dentry;
3136        struct ceph_dentry_info *di;
3137        int mds = session->s_mds;
3138        struct ceph_mds_lease *h = msg->front.iov_base;
3139        u32 seq;
3140        struct ceph_vino vino;
3141        struct qstr dname;
3142        int release = 0;
3143
3144        dout("handle_lease from mds%d\n", mds);
3145
3146        /* decode */
3147        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3148                goto bad;
3149        vino.ino = le64_to_cpu(h->ino);
3150        vino.snap = CEPH_NOSNAP;
3151        seq = le32_to_cpu(h->seq);
3152        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3153        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3154        if (dname.len != get_unaligned_le32(h+1))
3155                goto bad;
3156
3157        /* lookup inode */
3158        inode = ceph_find_inode(sb, vino);
3159        dout("handle_lease %s, ino %llx %p %.*s\n",
3160             ceph_lease_op_name(h->action), vino.ino, inode,
3161             dname.len, dname.name);
3162
3163        mutex_lock(&session->s_mutex);
3164        session->s_seq++;
3165
3166        if (inode == NULL) {
3167                dout("handle_lease no inode %llx\n", vino.ino);
3168                goto release;
3169        }
3170
3171        /* dentry */
3172        parent = d_find_alias(inode);
3173        if (!parent) {
3174                dout("no parent dentry on inode %p\n", inode);
3175                WARN_ON(1);
3176                goto release;  /* hrm... */
3177        }
3178        dname.hash = full_name_hash(dname.name, dname.len);
3179        dentry = d_lookup(parent, &dname);
3180        dput(parent);
3181        if (!dentry)
3182                goto release;
3183
3184        spin_lock(&dentry->d_lock);
3185        di = ceph_dentry(dentry);
3186        switch (h->action) {
3187        case CEPH_MDS_LEASE_REVOKE:
3188                if (di->lease_session == session) {
3189                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3190                                h->seq = cpu_to_le32(di->lease_seq);
3191                        __ceph_mdsc_drop_dentry_lease(dentry);
3192                }
3193                release = 1;
3194                break;
3195
3196        case CEPH_MDS_LEASE_RENEW:
3197                if (di->lease_session == session &&
3198                    di->lease_gen == session->s_cap_gen &&
3199                    di->lease_renew_from &&
3200                    di->lease_renew_after == 0) {
3201                        unsigned long duration =
3202                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3203
3204                        di->lease_seq = seq;
3205                        dentry->d_time = di->lease_renew_from + duration;
3206                        di->lease_renew_after = di->lease_renew_from +
3207                                (duration >> 1);
3208                        di->lease_renew_from = 0;
3209                }
3210                break;
3211        }
3212        spin_unlock(&dentry->d_lock);
3213        dput(dentry);
3214
3215        if (!release)
3216                goto out;
3217
3218release:
3219        /* let's just reuse the same message */
3220        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3221        ceph_msg_get(msg);
3222        ceph_con_send(&session->s_con, msg);
3223
3224out:
3225        iput(inode);
3226        mutex_unlock(&session->s_mutex);
3227        return;
3228
3229bad:
3230        pr_err("corrupt lease message\n");
3231        ceph_msg_dump(msg);
3232}
3233
3234void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3235                              struct inode *inode,
3236                              struct dentry *dentry, char action,
3237                              u32 seq)
3238{
3239        struct ceph_msg *msg;
3240        struct ceph_mds_lease *lease;
3241        int len = sizeof(*lease) + sizeof(u32);
3242        int dnamelen = 0;
3243
3244        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3245             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3246        dnamelen = dentry->d_name.len;
3247        len += dnamelen;
3248
3249        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3250        if (!msg)
3251                return;
3252        lease = msg->front.iov_base;
3253        lease->action = action;
3254        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3255        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3256        lease->seq = cpu_to_le32(seq);
3257        put_unaligned_le32(dnamelen, lease + 1);
3258        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3259
3260        /*
3261         * if this is a preemptive lease RELEASE, no need to
3262         * flush request stream, since the actual request will
3263         * soon follow.
3264         */
3265        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3266
3267        ceph_con_send(&session->s_con, msg);
3268}
3269
3270/*
3271 * Preemptively release a lease we expect to invalidate anyway.
3272 * Pass @inode always, @dentry is optional.
3273 */
3274void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3275                             struct dentry *dentry)
3276{
3277        struct ceph_dentry_info *di;
3278        struct ceph_mds_session *session;
3279        u32 seq;
3280
3281        BUG_ON(inode == NULL);
3282        BUG_ON(dentry == NULL);
3283
3284        /* is dentry lease valid? */
3285        spin_lock(&dentry->d_lock);
3286        di = ceph_dentry(dentry);
3287        if (!di || !di->lease_session ||
3288            di->lease_session->s_mds < 0 ||
3289            di->lease_gen != di->lease_session->s_cap_gen ||
3290            !time_before(jiffies, dentry->d_time)) {
3291                dout("lease_release inode %p dentry %p -- "
3292                     "no lease\n",
3293                     inode, dentry);
3294                spin_unlock(&dentry->d_lock);
3295                return;
3296        }
3297
3298        /* we do have a lease on this dentry; note mds and seq */
3299        session = ceph_get_mds_session(di->lease_session);
3300        seq = di->lease_seq;
3301        __ceph_mdsc_drop_dentry_lease(dentry);
3302        spin_unlock(&dentry->d_lock);
3303
3304        dout("lease_release inode %p dentry %p to mds%d\n",
3305             inode, dentry, session->s_mds);
3306        ceph_mdsc_lease_send_msg(session, inode, dentry,
3307                                 CEPH_MDS_LEASE_RELEASE, seq);
3308        ceph_put_mds_session(session);
3309}
3310
3311/*
3312 * drop all leases (and dentry refs) in preparation for umount
3313 */
3314static void drop_leases(struct ceph_mds_client *mdsc)
3315{
3316        int i;
3317
3318        dout("drop_leases\n");
3319        mutex_lock(&mdsc->mutex);
3320        for (i = 0; i < mdsc->max_sessions; i++) {
3321                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3322                if (!s)
3323                        continue;
3324                mutex_unlock(&mdsc->mutex);
3325                mutex_lock(&s->s_mutex);
3326                mutex_unlock(&s->s_mutex);
3327                ceph_put_mds_session(s);
3328                mutex_lock(&mdsc->mutex);
3329        }
3330        mutex_unlock(&mdsc->mutex);
3331}
3332
3333
3334
3335/*
3336 * delayed work -- periodically trim expired leases, renew caps with mds
3337 */
3338static void schedule_delayed(struct ceph_mds_client *mdsc)
3339{
3340        int delay = 5;
3341        unsigned hz = round_jiffies_relative(HZ * delay);
3342        schedule_delayed_work(&mdsc->delayed_work, hz);
3343}
3344
3345static void delayed_work(struct work_struct *work)
3346{
3347        int i;
3348        struct ceph_mds_client *mdsc =
3349                container_of(work, struct ceph_mds_client, delayed_work.work);
3350        int renew_interval;
3351        int renew_caps;
3352
3353        dout("mdsc delayed_work\n");
3354        ceph_check_delayed_caps(mdsc);
3355
3356        mutex_lock(&mdsc->mutex);
3357        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3358        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3359                                   mdsc->last_renew_caps);
3360        if (renew_caps)
3361                mdsc->last_renew_caps = jiffies;
3362
3363        for (i = 0; i < mdsc->max_sessions; i++) {
3364                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3365                if (s == NULL)
3366                        continue;
3367                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3368                        dout("resending session close request for mds%d\n",
3369                             s->s_mds);
3370                        request_close_session(mdsc, s);
3371                        ceph_put_mds_session(s);
3372                        continue;
3373                }
3374                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3375                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3376                                s->s_state = CEPH_MDS_SESSION_HUNG;
3377                                pr_info("mds%d hung\n", s->s_mds);
3378                        }
3379                }
3380                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3381                        /* this mds is failed or recovering, just wait */
3382                        ceph_put_mds_session(s);
3383                        continue;
3384                }
3385                mutex_unlock(&mdsc->mutex);
3386
3387                mutex_lock(&s->s_mutex);
3388                if (renew_caps)
3389                        send_renew_caps(mdsc, s);
3390                else
3391                        ceph_con_keepalive(&s->s_con);
3392                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3393                    s->s_state == CEPH_MDS_SESSION_HUNG)
3394                        ceph_send_cap_releases(mdsc, s);
3395                mutex_unlock(&s->s_mutex);
3396                ceph_put_mds_session(s);
3397
3398                mutex_lock(&mdsc->mutex);
3399        }
3400        mutex_unlock(&mdsc->mutex);
3401
3402        schedule_delayed(mdsc);
3403}
3404
3405int ceph_mdsc_init(struct ceph_fs_client *fsc)
3406
3407{
3408        struct ceph_mds_client *mdsc;
3409
3410        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3411        if (!mdsc)
3412                return -ENOMEM;
3413        mdsc->fsc = fsc;
3414        fsc->mdsc = mdsc;
3415        mutex_init(&mdsc->mutex);
3416        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3417        if (mdsc->mdsmap == NULL) {
3418                kfree(mdsc);
3419                return -ENOMEM;
3420        }
3421
3422        init_completion(&mdsc->safe_umount_waiters);
3423        init_waitqueue_head(&mdsc->session_close_wq);
3424        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3425        mdsc->sessions = NULL;
3426        atomic_set(&mdsc->num_sessions, 0);
3427        mdsc->max_sessions = 0;
3428        mdsc->stopping = 0;
3429        mdsc->last_snap_seq = 0;
3430        init_rwsem(&mdsc->snap_rwsem);
3431        mdsc->snap_realms = RB_ROOT;
3432        INIT_LIST_HEAD(&mdsc->snap_empty);
3433        spin_lock_init(&mdsc->snap_empty_lock);
3434        mdsc->last_tid = 0;
3435        mdsc->oldest_tid = 0;
3436        mdsc->request_tree = RB_ROOT;
3437        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3438        mdsc->last_renew_caps = jiffies;
3439        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3440        spin_lock_init(&mdsc->cap_delay_lock);
3441        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3442        spin_lock_init(&mdsc->snap_flush_lock);
3443        mdsc->last_cap_flush_tid = 1;
3444        mdsc->cap_flush_tree = RB_ROOT;
3445        INIT_LIST_HEAD(&mdsc->cap_dirty);
3446        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3447        mdsc->num_cap_flushing = 0;
3448        spin_lock_init(&mdsc->cap_dirty_lock);
3449        init_waitqueue_head(&mdsc->cap_flushing_wq);
3450        spin_lock_init(&mdsc->dentry_lru_lock);
3451        INIT_LIST_HEAD(&mdsc->dentry_lru);
3452
3453        ceph_caps_init(mdsc);
3454        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3455
3456        init_rwsem(&mdsc->pool_perm_rwsem);
3457        mdsc->pool_perm_tree = RB_ROOT;
3458
3459        return 0;
3460}
3461
3462/*
3463 * Wait for safe replies on open mds requests.  If we time out, drop
3464 * all requests from the tree to avoid dangling dentry refs.
3465 */
3466static void wait_requests(struct ceph_mds_client *mdsc)
3467{
3468        struct ceph_options *opts = mdsc->fsc->client->options;
3469        struct ceph_mds_request *req;
3470
3471        mutex_lock(&mdsc->mutex);
3472        if (__get_oldest_req(mdsc)) {
3473                mutex_unlock(&mdsc->mutex);
3474
3475                dout("wait_requests waiting for requests\n");
3476                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3477                                    ceph_timeout_jiffies(opts->mount_timeout));
3478
3479                /* tear down remaining requests */
3480                mutex_lock(&mdsc->mutex);
3481                while ((req = __get_oldest_req(mdsc))) {
3482                        dout("wait_requests timed out on tid %llu\n",
3483                             req->r_tid);
3484                        __unregister_request(mdsc, req);
3485                }
3486        }
3487        mutex_unlock(&mdsc->mutex);
3488        dout("wait_requests done\n");
3489}
3490
3491/*
3492 * called before mount is ro, and before dentries are torn down.
3493 * (hmm, does this still race with new lookups?)
3494 */
3495void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3496{
3497        dout("pre_umount\n");
3498        mdsc->stopping = 1;
3499
3500        drop_leases(mdsc);
3501        ceph_flush_dirty_caps(mdsc);
3502        wait_requests(mdsc);
3503
3504        /*
3505         * wait for reply handlers to drop their request refs and
3506         * their inode/dcache refs
3507         */
3508        ceph_msgr_flush();
3509}
3510
3511/*
3512 * wait for all write mds requests to flush.
3513 */
3514static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3515{
3516        struct ceph_mds_request *req = NULL, *nextreq;
3517        struct rb_node *n;
3518
3519        mutex_lock(&mdsc->mutex);
3520        dout("wait_unsafe_requests want %lld\n", want_tid);
3521restart:
3522        req = __get_oldest_req(mdsc);
3523        while (req && req->r_tid <= want_tid) {
3524                /* find next request */
3525                n = rb_next(&req->r_node);
3526                if (n)
3527                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3528                else
3529                        nextreq = NULL;
3530                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3531                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3532                        /* write op */
3533                        ceph_mdsc_get_request(req);
3534                        if (nextreq)
3535                                ceph_mdsc_get_request(nextreq);
3536                        mutex_unlock(&mdsc->mutex);
3537                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3538                             req->r_tid, want_tid);
3539                        wait_for_completion(&req->r_safe_completion);
3540                        mutex_lock(&mdsc->mutex);
3541                        ceph_mdsc_put_request(req);
3542                        if (!nextreq)
3543                                break;  /* next dne before, so we're done! */
3544                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3545                                /* next request was removed from tree */
3546                                ceph_mdsc_put_request(nextreq);
3547                                goto restart;
3548                        }
3549                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3550                }
3551                req = nextreq;
3552        }
3553        mutex_unlock(&mdsc->mutex);
3554        dout("wait_unsafe_requests done\n");
3555}
3556
3557void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3558{
3559        u64 want_tid, want_flush, want_snap;
3560
3561        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3562                return;
3563
3564        dout("sync\n");
3565        mutex_lock(&mdsc->mutex);
3566        want_tid = mdsc->last_tid;
3567        mutex_unlock(&mdsc->mutex);
3568
3569        ceph_flush_dirty_caps(mdsc);
3570        spin_lock(&mdsc->cap_dirty_lock);
3571        want_flush = mdsc->last_cap_flush_tid;
3572        spin_unlock(&mdsc->cap_dirty_lock);
3573
3574        down_read(&mdsc->snap_rwsem);
3575        want_snap = mdsc->last_snap_seq;
3576        up_read(&mdsc->snap_rwsem);
3577
3578        dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
3579             want_tid, want_flush, want_snap);
3580
3581        wait_unsafe_requests(mdsc, want_tid);
3582        wait_caps_flush(mdsc, want_flush, want_snap);
3583}
3584
3585/*
3586 * true if all sessions are closed, or we force unmount
3587 */
3588static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3589{
3590        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3591                return true;
3592        return atomic_read(&mdsc->num_sessions) == 0;
3593}
3594
3595/*
3596 * called after sb is ro.
3597 */
3598void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3599{
3600        struct ceph_options *opts = mdsc->fsc->client->options;
3601        struct ceph_mds_session *session;
3602        int i;
3603
3604        dout("close_sessions\n");
3605
3606        /* close sessions */
3607        mutex_lock(&mdsc->mutex);
3608        for (i = 0; i < mdsc->max_sessions; i++) {
3609                session = __ceph_lookup_mds_session(mdsc, i);
3610                if (!session)
3611                        continue;
3612                mutex_unlock(&mdsc->mutex);
3613                mutex_lock(&session->s_mutex);
3614                __close_session(mdsc, session);
3615                mutex_unlock(&session->s_mutex);
3616                ceph_put_mds_session(session);
3617                mutex_lock(&mdsc->mutex);
3618        }
3619        mutex_unlock(&mdsc->mutex);
3620
3621        dout("waiting for sessions to close\n");
3622        wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3623                           ceph_timeout_jiffies(opts->mount_timeout));
3624
3625        /* tear down remaining sessions */
3626        mutex_lock(&mdsc->mutex);
3627        for (i = 0; i < mdsc->max_sessions; i++) {
3628                if (mdsc->sessions[i]) {
3629                        session = get_session(mdsc->sessions[i]);
3630                        __unregister_session(mdsc, session);
3631                        mutex_unlock(&mdsc->mutex);
3632                        mutex_lock(&session->s_mutex);
3633                        remove_session_caps(session);
3634                        mutex_unlock(&session->s_mutex);
3635                        ceph_put_mds_session(session);
3636                        mutex_lock(&mdsc->mutex);
3637                }
3638        }
3639        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3640        mutex_unlock(&mdsc->mutex);
3641
3642        ceph_cleanup_empty_realms(mdsc);
3643
3644        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3645
3646        dout("stopped\n");
3647}
3648
3649void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3650{
3651        struct ceph_mds_session *session;
3652        int mds;
3653
3654        dout("force umount\n");
3655
3656        mutex_lock(&mdsc->mutex);
3657        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3658                session = __ceph_lookup_mds_session(mdsc, mds);
3659                if (!session)
3660                        continue;
3661                mutex_unlock(&mdsc->mutex);
3662                mutex_lock(&session->s_mutex);
3663                __close_session(mdsc, session);
3664                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3665                        cleanup_session_requests(mdsc, session);
3666                        remove_session_caps(session);
3667                }
3668                mutex_unlock(&session->s_mutex);
3669                ceph_put_mds_session(session);
3670                mutex_lock(&mdsc->mutex);
3671                kick_requests(mdsc, mds);
3672        }
3673        __wake_requests(mdsc, &mdsc->waiting_for_map);
3674        mutex_unlock(&mdsc->mutex);
3675}
3676
3677static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3678{
3679        dout("stop\n");
3680        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3681        if (mdsc->mdsmap)
3682                ceph_mdsmap_destroy(mdsc->mdsmap);
3683        kfree(mdsc->sessions);
3684        ceph_caps_finalize(mdsc);
3685        ceph_pool_perm_destroy(mdsc);
3686}
3687
3688void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3689{
3690        struct ceph_mds_client *mdsc = fsc->mdsc;
3691
3692        dout("mdsc_destroy %p\n", mdsc);
3693        ceph_mdsc_stop(mdsc);
3694
3695        /* flush out any connection work with references to us */
3696        ceph_msgr_flush();
3697
3698        fsc->mdsc = NULL;
3699        kfree(mdsc);
3700        dout("mdsc_destroy %p done\n", mdsc);
3701}
3702
3703
3704/*
3705 * handle mds map update.
3706 */
3707void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3708{
3709        u32 epoch;
3710        u32 maplen;
3711        void *p = msg->front.iov_base;
3712        void *end = p + msg->front.iov_len;
3713        struct ceph_mdsmap *newmap, *oldmap;
3714        struct ceph_fsid fsid;
3715        int err = -EINVAL;
3716
3717        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3718        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3719        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3720                return;
3721        epoch = ceph_decode_32(&p);
3722        maplen = ceph_decode_32(&p);
3723        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3724
3725        /* do we need it? */
3726        ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3727        mutex_lock(&mdsc->mutex);
3728        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3729                dout("handle_map epoch %u <= our %u\n",
3730                     epoch, mdsc->mdsmap->m_epoch);
3731                mutex_unlock(&mdsc->mutex);
3732                return;
3733        }
3734
3735        newmap = ceph_mdsmap_decode(&p, end);
3736        if (IS_ERR(newmap)) {
3737                err = PTR_ERR(newmap);
3738                goto bad_unlock;
3739        }
3740
3741        /* swap into place */
3742        if (mdsc->mdsmap) {
3743                oldmap = mdsc->mdsmap;
3744                mdsc->mdsmap = newmap;
3745                check_new_map(mdsc, newmap, oldmap);
3746                ceph_mdsmap_destroy(oldmap);
3747        } else {
3748                mdsc->mdsmap = newmap;  /* first mds map */
3749        }
3750        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3751
3752        __wake_requests(mdsc, &mdsc->waiting_for_map);
3753
3754        mutex_unlock(&mdsc->mutex);
3755        schedule_delayed(mdsc);
3756        return;
3757
3758bad_unlock:
3759        mutex_unlock(&mdsc->mutex);
3760bad:
3761        pr_err("error decoding mdsmap %d\n", err);
3762        return;
3763}
3764
3765static struct ceph_connection *con_get(struct ceph_connection *con)
3766{
3767        struct ceph_mds_session *s = con->private;
3768
3769        if (get_session(s)) {
3770                dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3771                return con;
3772        }
3773        dout("mdsc con_get %p FAIL\n", s);
3774        return NULL;
3775}
3776
3777static void con_put(struct ceph_connection *con)
3778{
3779        struct ceph_mds_session *s = con->private;
3780
3781        dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3782        ceph_put_mds_session(s);
3783}
3784
3785/*
3786 * if the client is unresponsive for long enough, the mds will kill
3787 * the session entirely.
3788 */
3789static void peer_reset(struct ceph_connection *con)
3790{
3791        struct ceph_mds_session *s = con->private;
3792        struct ceph_mds_client *mdsc = s->s_mdsc;
3793
3794        pr_warn("mds%d closed our session\n", s->s_mds);
3795        send_mds_reconnect(mdsc, s);
3796}
3797
3798static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3799{
3800        struct ceph_mds_session *s = con->private;
3801        struct ceph_mds_client *mdsc = s->s_mdsc;
3802        int type = le16_to_cpu(msg->hdr.type);
3803
3804        mutex_lock(&mdsc->mutex);
3805        if (__verify_registered_session(mdsc, s) < 0) {
3806                mutex_unlock(&mdsc->mutex);
3807                goto out;
3808        }
3809        mutex_unlock(&mdsc->mutex);
3810
3811        switch (type) {
3812        case CEPH_MSG_MDS_MAP:
3813                ceph_mdsc_handle_map(mdsc, msg);
3814                break;
3815        case CEPH_MSG_CLIENT_SESSION:
3816                handle_session(s, msg);
3817                break;
3818        case CEPH_MSG_CLIENT_REPLY:
3819                handle_reply(s, msg);
3820                break;
3821        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3822                handle_forward(mdsc, s, msg);
3823                break;
3824        case CEPH_MSG_CLIENT_CAPS:
3825                ceph_handle_caps(s, msg);
3826                break;
3827        case CEPH_MSG_CLIENT_SNAP:
3828                ceph_handle_snap(mdsc, s, msg);
3829                break;
3830        case CEPH_MSG_CLIENT_LEASE:
3831                handle_lease(mdsc, s, msg);
3832                break;
3833
3834        default:
3835                pr_err("received unknown message type %d %s\n", type,
3836                       ceph_msg_type_name(type));
3837        }
3838out:
3839        ceph_msg_put(msg);
3840}
3841
3842/*
3843 * authentication
3844 */
3845
3846/*
3847 * Note: returned pointer is the address of a structure that's
3848 * managed separately.  Caller must *not* attempt to free it.
3849 */
3850static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3851                                        int *proto, int force_new)
3852{
3853        struct ceph_mds_session *s = con->private;
3854        struct ceph_mds_client *mdsc = s->s_mdsc;
3855        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3856        struct ceph_auth_handshake *auth = &s->s_auth;
3857
3858        if (force_new && auth->authorizer) {
3859                ceph_auth_destroy_authorizer(ac, auth->authorizer);
3860                auth->authorizer = NULL;
3861        }
3862        if (!auth->authorizer) {
3863                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3864                                                      auth);
3865                if (ret)
3866                        return ERR_PTR(ret);
3867        } else {
3868                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3869                                                      auth);
3870                if (ret)
3871                        return ERR_PTR(ret);
3872        }
3873        *proto = ac->protocol;
3874
3875        return auth;
3876}
3877
3878
3879static int verify_authorizer_reply(struct ceph_connection *con, int len)
3880{
3881        struct ceph_mds_session *s = con->private;
3882        struct ceph_mds_client *mdsc = s->s_mdsc;
3883        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3884
3885        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3886}
3887
3888static int invalidate_authorizer(struct ceph_connection *con)
3889{
3890        struct ceph_mds_session *s = con->private;
3891        struct ceph_mds_client *mdsc = s->s_mdsc;
3892        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3893
3894        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3895
3896        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3897}
3898
3899static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3900                                struct ceph_msg_header *hdr, int *skip)
3901{
3902        struct ceph_msg *msg;
3903        int type = (int) le16_to_cpu(hdr->type);
3904        int front_len = (int) le32_to_cpu(hdr->front_len);
3905
3906        if (con->in_msg)
3907                return con->in_msg;
3908
3909        *skip = 0;
3910        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3911        if (!msg) {
3912                pr_err("unable to allocate msg type %d len %d\n",
3913                       type, front_len);
3914                return NULL;
3915        }
3916
3917        return msg;
3918}
3919
3920static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
3921{
3922       struct ceph_mds_session *s = con->private;
3923       struct ceph_auth_handshake *auth = &s->s_auth;
3924       return ceph_auth_sign_message(auth, msg);
3925}
3926
3927static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
3928{
3929       struct ceph_mds_session *s = con->private;
3930       struct ceph_auth_handshake *auth = &s->s_auth;
3931       return ceph_auth_check_message_signature(auth, msg);
3932}
3933
3934static const struct ceph_connection_operations mds_con_ops = {
3935        .get = con_get,
3936        .put = con_put,
3937        .dispatch = dispatch,
3938        .get_authorizer = get_authorizer,
3939        .verify_authorizer_reply = verify_authorizer_reply,
3940        .invalidate_authorizer = invalidate_authorizer,
3941        .peer_reset = peer_reset,
3942        .alloc_msg = mds_alloc_msg,
3943        .sign_message = sign_message,
3944        .check_message_signature = check_message_signature,
3945};
3946
3947/* eof */
3948