linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/fs.h>
   5#include <linux/wait.h>
   6#include <linux/slab.h>
   7#include <linux/gfp.h>
   8#include <linux/sched.h>
   9#include <linux/debugfs.h>
  10#include <linux/seq_file.h>
  11#include <linux/ratelimit.h>
  12
  13#include "super.h"
  14#include "mds_client.h"
  15
  16#include <linux/ceph/ceph_features.h>
  17#include <linux/ceph/messenger.h>
  18#include <linux/ceph/decode.h>
  19#include <linux/ceph/pagelist.h>
  20#include <linux/ceph/auth.h>
  21#include <linux/ceph/debugfs.h>
  22
  23#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
  24
  25/*
  26 * A cluster of MDS (metadata server) daemons is responsible for
  27 * managing the file system namespace (the directory hierarchy and
  28 * inodes) and for coordinating shared access to storage.  Metadata is
  29 * partitioning hierarchically across a number of servers, and that
  30 * partition varies over time as the cluster adjusts the distribution
  31 * in order to balance load.
  32 *
  33 * The MDS client is primarily responsible to managing synchronous
  34 * metadata requests for operations like open, unlink, and so forth.
  35 * If there is a MDS failure, we find out about it when we (possibly
  36 * request and) receive a new MDS map, and can resubmit affected
  37 * requests.
  38 *
  39 * For the most part, though, we take advantage of a lossless
  40 * communications channel to the MDS, and do not need to worry about
  41 * timing out or resubmitting requests.
  42 *
  43 * We maintain a stateful "session" with each MDS we interact with.
  44 * Within each session, we sent periodic heartbeat messages to ensure
  45 * any capabilities or leases we have been issues remain valid.  If
  46 * the session times out and goes stale, our leases and capabilities
  47 * are no longer valid.
  48 */
  49
  50struct ceph_reconnect_state {
  51        struct ceph_mds_session *session;
  52        int nr_caps, nr_realms;
  53        struct ceph_pagelist *pagelist;
  54        unsigned msg_version;
  55        bool allow_multi;
  56};
  57
  58static void __wake_requests(struct ceph_mds_client *mdsc,
  59                            struct list_head *head);
  60static void ceph_cap_release_work(struct work_struct *work);
  61static void ceph_cap_reclaim_work(struct work_struct *work);
  62
  63static const struct ceph_connection_operations mds_con_ops;
  64
  65
  66/*
  67 * mds reply parsing
  68 */
  69
  70static int parse_reply_info_quota(void **p, void *end,
  71                                  struct ceph_mds_reply_info_in *info)
  72{
  73        u8 struct_v, struct_compat;
  74        u32 struct_len;
  75
  76        ceph_decode_8_safe(p, end, struct_v, bad);
  77        ceph_decode_8_safe(p, end, struct_compat, bad);
  78        /* struct_v is expected to be >= 1. we only
  79         * understand encoding with struct_compat == 1. */
  80        if (!struct_v || struct_compat != 1)
  81                goto bad;
  82        ceph_decode_32_safe(p, end, struct_len, bad);
  83        ceph_decode_need(p, end, struct_len, bad);
  84        end = *p + struct_len;
  85        ceph_decode_64_safe(p, end, info->max_bytes, bad);
  86        ceph_decode_64_safe(p, end, info->max_files, bad);
  87        *p = end;
  88        return 0;
  89bad:
  90        return -EIO;
  91}
  92
  93/*
  94 * parse individual inode info
  95 */
  96static int parse_reply_info_in(void **p, void *end,
  97                               struct ceph_mds_reply_info_in *info,
  98                               u64 features)
  99{
 100        int err = 0;
 101        u8 struct_v = 0;
 102
 103        if (features == (u64)-1) {
 104                u32 struct_len;
 105                u8 struct_compat;
 106                ceph_decode_8_safe(p, end, struct_v, bad);
 107                ceph_decode_8_safe(p, end, struct_compat, bad);
 108                /* struct_v is expected to be >= 1. we only understand
 109                 * encoding with struct_compat == 1. */
 110                if (!struct_v || struct_compat != 1)
 111                        goto bad;
 112                ceph_decode_32_safe(p, end, struct_len, bad);
 113                ceph_decode_need(p, end, struct_len, bad);
 114                end = *p + struct_len;
 115        }
 116
 117        ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
 118        info->in = *p;
 119        *p += sizeof(struct ceph_mds_reply_inode) +
 120                sizeof(*info->in->fragtree.splits) *
 121                le32_to_cpu(info->in->fragtree.nsplits);
 122
 123        ceph_decode_32_safe(p, end, info->symlink_len, bad);
 124        ceph_decode_need(p, end, info->symlink_len, bad);
 125        info->symlink = *p;
 126        *p += info->symlink_len;
 127
 128        ceph_decode_copy_safe(p, end, &info->dir_layout,
 129                              sizeof(info->dir_layout), bad);
 130        ceph_decode_32_safe(p, end, info->xattr_len, bad);
 131        ceph_decode_need(p, end, info->xattr_len, bad);
 132        info->xattr_data = *p;
 133        *p += info->xattr_len;
 134
 135        if (features == (u64)-1) {
 136                /* inline data */
 137                ceph_decode_64_safe(p, end, info->inline_version, bad);
 138                ceph_decode_32_safe(p, end, info->inline_len, bad);
 139                ceph_decode_need(p, end, info->inline_len, bad);
 140                info->inline_data = *p;
 141                *p += info->inline_len;
 142                /* quota */
 143                err = parse_reply_info_quota(p, end, info);
 144                if (err < 0)
 145                        goto out_bad;
 146                /* pool namespace */
 147                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 148                if (info->pool_ns_len > 0) {
 149                        ceph_decode_need(p, end, info->pool_ns_len, bad);
 150                        info->pool_ns_data = *p;
 151                        *p += info->pool_ns_len;
 152                }
 153
 154                /* btime */
 155                ceph_decode_need(p, end, sizeof(info->btime), bad);
 156                ceph_decode_copy(p, &info->btime, sizeof(info->btime));
 157
 158                /* change attribute */
 159                ceph_decode_64_safe(p, end, info->change_attr, bad);
 160
 161                /* dir pin */
 162                if (struct_v >= 2) {
 163                        ceph_decode_32_safe(p, end, info->dir_pin, bad);
 164                } else {
 165                        info->dir_pin = -ENODATA;
 166                }
 167
 168                /* snapshot birth time, remains zero for v<=2 */
 169                if (struct_v >= 3) {
 170                        ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
 171                        ceph_decode_copy(p, &info->snap_btime,
 172                                         sizeof(info->snap_btime));
 173                } else {
 174                        memset(&info->snap_btime, 0, sizeof(info->snap_btime));
 175                }
 176
 177                *p = end;
 178        } else {
 179                if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
 180                        ceph_decode_64_safe(p, end, info->inline_version, bad);
 181                        ceph_decode_32_safe(p, end, info->inline_len, bad);
 182                        ceph_decode_need(p, end, info->inline_len, bad);
 183                        info->inline_data = *p;
 184                        *p += info->inline_len;
 185                } else
 186                        info->inline_version = CEPH_INLINE_NONE;
 187
 188                if (features & CEPH_FEATURE_MDS_QUOTA) {
 189                        err = parse_reply_info_quota(p, end, info);
 190                        if (err < 0)
 191                                goto out_bad;
 192                } else {
 193                        info->max_bytes = 0;
 194                        info->max_files = 0;
 195                }
 196
 197                info->pool_ns_len = 0;
 198                info->pool_ns_data = NULL;
 199                if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 200                        ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 201                        if (info->pool_ns_len > 0) {
 202                                ceph_decode_need(p, end, info->pool_ns_len, bad);
 203                                info->pool_ns_data = *p;
 204                                *p += info->pool_ns_len;
 205                        }
 206                }
 207
 208                if (features & CEPH_FEATURE_FS_BTIME) {
 209                        ceph_decode_need(p, end, sizeof(info->btime), bad);
 210                        ceph_decode_copy(p, &info->btime, sizeof(info->btime));
 211                        ceph_decode_64_safe(p, end, info->change_attr, bad);
 212                }
 213
 214                info->dir_pin = -ENODATA;
 215                /* info->snap_btime remains zero */
 216        }
 217        return 0;
 218bad:
 219        err = -EIO;
 220out_bad:
 221        return err;
 222}
 223
 224static int parse_reply_info_dir(void **p, void *end,
 225                                struct ceph_mds_reply_dirfrag **dirfrag,
 226                                u64 features)
 227{
 228        if (features == (u64)-1) {
 229                u8 struct_v, struct_compat;
 230                u32 struct_len;
 231                ceph_decode_8_safe(p, end, struct_v, bad);
 232                ceph_decode_8_safe(p, end, struct_compat, bad);
 233                /* struct_v is expected to be >= 1. we only understand
 234                 * encoding whose struct_compat == 1. */
 235                if (!struct_v || struct_compat != 1)
 236                        goto bad;
 237                ceph_decode_32_safe(p, end, struct_len, bad);
 238                ceph_decode_need(p, end, struct_len, bad);
 239                end = *p + struct_len;
 240        }
 241
 242        ceph_decode_need(p, end, sizeof(**dirfrag), bad);
 243        *dirfrag = *p;
 244        *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
 245        if (unlikely(*p > end))
 246                goto bad;
 247        if (features == (u64)-1)
 248                *p = end;
 249        return 0;
 250bad:
 251        return -EIO;
 252}
 253
 254static int parse_reply_info_lease(void **p, void *end,
 255                                  struct ceph_mds_reply_lease **lease,
 256                                  u64 features)
 257{
 258        if (features == (u64)-1) {
 259                u8 struct_v, struct_compat;
 260                u32 struct_len;
 261                ceph_decode_8_safe(p, end, struct_v, bad);
 262                ceph_decode_8_safe(p, end, struct_compat, bad);
 263                /* struct_v is expected to be >= 1. we only understand
 264                 * encoding whose struct_compat == 1. */
 265                if (!struct_v || struct_compat != 1)
 266                        goto bad;
 267                ceph_decode_32_safe(p, end, struct_len, bad);
 268                ceph_decode_need(p, end, struct_len, bad);
 269                end = *p + struct_len;
 270        }
 271
 272        ceph_decode_need(p, end, sizeof(**lease), bad);
 273        *lease = *p;
 274        *p += sizeof(**lease);
 275        if (features == (u64)-1)
 276                *p = end;
 277        return 0;
 278bad:
 279        return -EIO;
 280}
 281
 282/*
 283 * parse a normal reply, which may contain a (dir+)dentry and/or a
 284 * target inode.
 285 */
 286static int parse_reply_info_trace(void **p, void *end,
 287                                  struct ceph_mds_reply_info_parsed *info,
 288                                  u64 features)
 289{
 290        int err;
 291
 292        if (info->head->is_dentry) {
 293                err = parse_reply_info_in(p, end, &info->diri, features);
 294                if (err < 0)
 295                        goto out_bad;
 296
 297                err = parse_reply_info_dir(p, end, &info->dirfrag, features);
 298                if (err < 0)
 299                        goto out_bad;
 300
 301                ceph_decode_32_safe(p, end, info->dname_len, bad);
 302                ceph_decode_need(p, end, info->dname_len, bad);
 303                info->dname = *p;
 304                *p += info->dname_len;
 305
 306                err = parse_reply_info_lease(p, end, &info->dlease, features);
 307                if (err < 0)
 308                        goto out_bad;
 309        }
 310
 311        if (info->head->is_target) {
 312                err = parse_reply_info_in(p, end, &info->targeti, features);
 313                if (err < 0)
 314                        goto out_bad;
 315        }
 316
 317        if (unlikely(*p != end))
 318                goto bad;
 319        return 0;
 320
 321bad:
 322        err = -EIO;
 323out_bad:
 324        pr_err("problem parsing mds trace %d\n", err);
 325        return err;
 326}
 327
 328/*
 329 * parse readdir results
 330 */
 331static int parse_reply_info_readdir(void **p, void *end,
 332                                struct ceph_mds_reply_info_parsed *info,
 333                                u64 features)
 334{
 335        u32 num, i = 0;
 336        int err;
 337
 338        err = parse_reply_info_dir(p, end, &info->dir_dir, features);
 339        if (err < 0)
 340                goto out_bad;
 341
 342        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 343        num = ceph_decode_32(p);
 344        {
 345                u16 flags = ceph_decode_16(p);
 346                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 347                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 348                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
 349                info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 350        }
 351        if (num == 0)
 352                goto done;
 353
 354        BUG_ON(!info->dir_entries);
 355        if ((unsigned long)(info->dir_entries + num) >
 356            (unsigned long)info->dir_entries + info->dir_buf_size) {
 357                pr_err("dir contents are larger than expected\n");
 358                WARN_ON(1);
 359                goto bad;
 360        }
 361
 362        info->dir_nr = num;
 363        while (num) {
 364                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 365                /* dentry */
 366                ceph_decode_32_safe(p, end, rde->name_len, bad);
 367                ceph_decode_need(p, end, rde->name_len, bad);
 368                rde->name = *p;
 369                *p += rde->name_len;
 370                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
 371
 372                /* dentry lease */
 373                err = parse_reply_info_lease(p, end, &rde->lease, features);
 374                if (err)
 375                        goto out_bad;
 376                /* inode */
 377                err = parse_reply_info_in(p, end, &rde->inode, features);
 378                if (err < 0)
 379                        goto out_bad;
 380                /* ceph_readdir_prepopulate() will update it */
 381                rde->offset = 0;
 382                i++;
 383                num--;
 384        }
 385
 386done:
 387        if (*p != end)
 388                goto bad;
 389        return 0;
 390
 391bad:
 392        err = -EIO;
 393out_bad:
 394        pr_err("problem parsing dir contents %d\n", err);
 395        return err;
 396}
 397
 398/*
 399 * parse fcntl F_GETLK results
 400 */
 401static int parse_reply_info_filelock(void **p, void *end,
 402                                     struct ceph_mds_reply_info_parsed *info,
 403                                     u64 features)
 404{
 405        if (*p + sizeof(*info->filelock_reply) > end)
 406                goto bad;
 407
 408        info->filelock_reply = *p;
 409        *p += sizeof(*info->filelock_reply);
 410
 411        if (unlikely(*p != end))
 412                goto bad;
 413        return 0;
 414
 415bad:
 416        return -EIO;
 417}
 418
 419/*
 420 * parse create results
 421 */
 422static int parse_reply_info_create(void **p, void *end,
 423                                  struct ceph_mds_reply_info_parsed *info,
 424                                  u64 features)
 425{
 426        if (features == (u64)-1 ||
 427            (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
 428                if (*p == end) {
 429                        info->has_create_ino = false;
 430                } else {
 431                        info->has_create_ino = true;
 432                        info->ino = ceph_decode_64(p);
 433                }
 434        }
 435
 436        if (unlikely(*p != end))
 437                goto bad;
 438        return 0;
 439
 440bad:
 441        return -EIO;
 442}
 443
 444/*
 445 * parse extra results
 446 */
 447static int parse_reply_info_extra(void **p, void *end,
 448                                  struct ceph_mds_reply_info_parsed *info,
 449                                  u64 features)
 450{
 451        u32 op = le32_to_cpu(info->head->op);
 452
 453        if (op == CEPH_MDS_OP_GETFILELOCK)
 454                return parse_reply_info_filelock(p, end, info, features);
 455        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
 456                return parse_reply_info_readdir(p, end, info, features);
 457        else if (op == CEPH_MDS_OP_CREATE)
 458                return parse_reply_info_create(p, end, info, features);
 459        else
 460                return -EIO;
 461}
 462
 463/*
 464 * parse entire mds reply
 465 */
 466static int parse_reply_info(struct ceph_msg *msg,
 467                            struct ceph_mds_reply_info_parsed *info,
 468                            u64 features)
 469{
 470        void *p, *end;
 471        u32 len;
 472        int err;
 473
 474        info->head = msg->front.iov_base;
 475        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 476        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 477
 478        /* trace */
 479        ceph_decode_32_safe(&p, end, len, bad);
 480        if (len > 0) {
 481                ceph_decode_need(&p, end, len, bad);
 482                err = parse_reply_info_trace(&p, p+len, info, features);
 483                if (err < 0)
 484                        goto out_bad;
 485        }
 486
 487        /* extra */
 488        ceph_decode_32_safe(&p, end, len, bad);
 489        if (len > 0) {
 490                ceph_decode_need(&p, end, len, bad);
 491                err = parse_reply_info_extra(&p, p+len, info, features);
 492                if (err < 0)
 493                        goto out_bad;
 494        }
 495
 496        /* snap blob */
 497        ceph_decode_32_safe(&p, end, len, bad);
 498        info->snapblob_len = len;
 499        info->snapblob = p;
 500        p += len;
 501
 502        if (p != end)
 503                goto bad;
 504        return 0;
 505
 506bad:
 507        err = -EIO;
 508out_bad:
 509        pr_err("mds parse_reply err %d\n", err);
 510        return err;
 511}
 512
 513static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 514{
 515        if (!info->dir_entries)
 516                return;
 517        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 518}
 519
 520
 521/*
 522 * sessions
 523 */
 524const char *ceph_session_state_name(int s)
 525{
 526        switch (s) {
 527        case CEPH_MDS_SESSION_NEW: return "new";
 528        case CEPH_MDS_SESSION_OPENING: return "opening";
 529        case CEPH_MDS_SESSION_OPEN: return "open";
 530        case CEPH_MDS_SESSION_HUNG: return "hung";
 531        case CEPH_MDS_SESSION_CLOSING: return "closing";
 532        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 533        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 534        case CEPH_MDS_SESSION_REJECTED: return "rejected";
 535        default: return "???";
 536        }
 537}
 538
 539static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 540{
 541        if (refcount_inc_not_zero(&s->s_ref)) {
 542                dout("mdsc get_session %p %d -> %d\n", s,
 543                     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
 544                return s;
 545        } else {
 546                dout("mdsc get_session %p 0 -- FAIL\n", s);
 547                return NULL;
 548        }
 549}
 550
 551void ceph_put_mds_session(struct ceph_mds_session *s)
 552{
 553        dout("mdsc put_session %p %d -> %d\n", s,
 554             refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
 555        if (refcount_dec_and_test(&s->s_ref)) {
 556                if (s->s_auth.authorizer)
 557                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 558                kfree(s);
 559        }
 560}
 561
 562/*
 563 * called under mdsc->mutex
 564 */
 565struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 566                                                   int mds)
 567{
 568        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 569                return NULL;
 570        return get_session(mdsc->sessions[mds]);
 571}
 572
 573static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 574{
 575        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 576                return false;
 577        else
 578                return true;
 579}
 580
 581static int __verify_registered_session(struct ceph_mds_client *mdsc,
 582                                       struct ceph_mds_session *s)
 583{
 584        if (s->s_mds >= mdsc->max_sessions ||
 585            mdsc->sessions[s->s_mds] != s)
 586                return -ENOENT;
 587        return 0;
 588}
 589
 590/*
 591 * create+register a new session for given mds.
 592 * called under mdsc->mutex.
 593 */
 594static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 595                                                 int mds)
 596{
 597        struct ceph_mds_session *s;
 598
 599        if (mds >= mdsc->mdsmap->m_num_mds)
 600                return ERR_PTR(-EINVAL);
 601
 602        s = kzalloc(sizeof(*s), GFP_NOFS);
 603        if (!s)
 604                return ERR_PTR(-ENOMEM);
 605
 606        if (mds >= mdsc->max_sessions) {
 607                int newmax = 1 << get_count_order(mds + 1);
 608                struct ceph_mds_session **sa;
 609
 610                dout("%s: realloc to %d\n", __func__, newmax);
 611                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 612                if (!sa)
 613                        goto fail_realloc;
 614                if (mdsc->sessions) {
 615                        memcpy(sa, mdsc->sessions,
 616                               mdsc->max_sessions * sizeof(void *));
 617                        kfree(mdsc->sessions);
 618                }
 619                mdsc->sessions = sa;
 620                mdsc->max_sessions = newmax;
 621        }
 622
 623        dout("%s: mds%d\n", __func__, mds);
 624        s->s_mdsc = mdsc;
 625        s->s_mds = mds;
 626        s->s_state = CEPH_MDS_SESSION_NEW;
 627        s->s_ttl = 0;
 628        s->s_seq = 0;
 629        mutex_init(&s->s_mutex);
 630
 631        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 632
 633        spin_lock_init(&s->s_gen_ttl_lock);
 634        s->s_cap_gen = 1;
 635        s->s_cap_ttl = jiffies - 1;
 636
 637        spin_lock_init(&s->s_cap_lock);
 638        s->s_renew_requested = 0;
 639        s->s_renew_seq = 0;
 640        INIT_LIST_HEAD(&s->s_caps);
 641        s->s_nr_caps = 0;
 642        s->s_trim_caps = 0;
 643        refcount_set(&s->s_ref, 1);
 644        INIT_LIST_HEAD(&s->s_waiting);
 645        INIT_LIST_HEAD(&s->s_unsafe);
 646        s->s_num_cap_releases = 0;
 647        s->s_cap_reconnect = 0;
 648        s->s_cap_iterator = NULL;
 649        INIT_LIST_HEAD(&s->s_cap_releases);
 650        INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
 651
 652        INIT_LIST_HEAD(&s->s_cap_flushing);
 653
 654        mdsc->sessions[mds] = s;
 655        atomic_inc(&mdsc->num_sessions);
 656        refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 657
 658        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 659                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 660
 661        return s;
 662
 663fail_realloc:
 664        kfree(s);
 665        return ERR_PTR(-ENOMEM);
 666}
 667
 668/*
 669 * called under mdsc->mutex
 670 */
 671static void __unregister_session(struct ceph_mds_client *mdsc,
 672                               struct ceph_mds_session *s)
 673{
 674        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 675        BUG_ON(mdsc->sessions[s->s_mds] != s);
 676        mdsc->sessions[s->s_mds] = NULL;
 677        s->s_state = 0;
 678        ceph_con_close(&s->s_con);
 679        ceph_put_mds_session(s);
 680        atomic_dec(&mdsc->num_sessions);
 681}
 682
 683/*
 684 * drop session refs in request.
 685 *
 686 * should be last request ref, or hold mdsc->mutex
 687 */
 688static void put_request_session(struct ceph_mds_request *req)
 689{
 690        if (req->r_session) {
 691                ceph_put_mds_session(req->r_session);
 692                req->r_session = NULL;
 693        }
 694}
 695
 696void ceph_mdsc_release_request(struct kref *kref)
 697{
 698        struct ceph_mds_request *req = container_of(kref,
 699                                                    struct ceph_mds_request,
 700                                                    r_kref);
 701        destroy_reply_info(&req->r_reply_info);
 702        if (req->r_request)
 703                ceph_msg_put(req->r_request);
 704        if (req->r_reply)
 705                ceph_msg_put(req->r_reply);
 706        if (req->r_inode) {
 707                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 708                /* avoid calling iput_final() in mds dispatch threads */
 709                ceph_async_iput(req->r_inode);
 710        }
 711        if (req->r_parent)
 712                ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
 713        ceph_async_iput(req->r_target_inode);
 714        if (req->r_dentry)
 715                dput(req->r_dentry);
 716        if (req->r_old_dentry)
 717                dput(req->r_old_dentry);
 718        if (req->r_old_dentry_dir) {
 719                /*
 720                 * track (and drop pins for) r_old_dentry_dir
 721                 * separately, since r_old_dentry's d_parent may have
 722                 * changed between the dir mutex being dropped and
 723                 * this request being freed.
 724                 */
 725                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 726                                  CEPH_CAP_PIN);
 727                ceph_async_iput(req->r_old_dentry_dir);
 728        }
 729        kfree(req->r_path1);
 730        kfree(req->r_path2);
 731        if (req->r_pagelist)
 732                ceph_pagelist_release(req->r_pagelist);
 733        put_request_session(req);
 734        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 735        WARN_ON_ONCE(!list_empty(&req->r_wait));
 736        kfree(req);
 737}
 738
 739DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
 740
 741/*
 742 * lookup session, bump ref if found.
 743 *
 744 * called under mdsc->mutex.
 745 */
 746static struct ceph_mds_request *
 747lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
 748{
 749        struct ceph_mds_request *req;
 750
 751        req = lookup_request(&mdsc->request_tree, tid);
 752        if (req)
 753                ceph_mdsc_get_request(req);
 754
 755        return req;
 756}
 757
 758/*
 759 * Register an in-flight request, and assign a tid.  Link to directory
 760 * are modifying (if any).
 761 *
 762 * Called under mdsc->mutex.
 763 */
 764static void __register_request(struct ceph_mds_client *mdsc,
 765                               struct ceph_mds_request *req,
 766                               struct inode *dir)
 767{
 768        int ret = 0;
 769
 770        req->r_tid = ++mdsc->last_tid;
 771        if (req->r_num_caps) {
 772                ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 773                                        req->r_num_caps);
 774                if (ret < 0) {
 775                        pr_err("__register_request %p "
 776                               "failed to reserve caps: %d\n", req, ret);
 777                        /* set req->r_err to fail early from __do_request */
 778                        req->r_err = ret;
 779                        return;
 780                }
 781        }
 782        dout("__register_request %p tid %lld\n", req, req->r_tid);
 783        ceph_mdsc_get_request(req);
 784        insert_request(&mdsc->request_tree, req);
 785
 786        req->r_uid = current_fsuid();
 787        req->r_gid = current_fsgid();
 788
 789        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 790                mdsc->oldest_tid = req->r_tid;
 791
 792        if (dir) {
 793                ihold(dir);
 794                req->r_unsafe_dir = dir;
 795        }
 796}
 797
 798static void __unregister_request(struct ceph_mds_client *mdsc,
 799                                 struct ceph_mds_request *req)
 800{
 801        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 802
 803        /* Never leave an unregistered request on an unsafe list! */
 804        list_del_init(&req->r_unsafe_item);
 805
 806        if (req->r_tid == mdsc->oldest_tid) {
 807                struct rb_node *p = rb_next(&req->r_node);
 808                mdsc->oldest_tid = 0;
 809                while (p) {
 810                        struct ceph_mds_request *next_req =
 811                                rb_entry(p, struct ceph_mds_request, r_node);
 812                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 813                                mdsc->oldest_tid = next_req->r_tid;
 814                                break;
 815                        }
 816                        p = rb_next(p);
 817                }
 818        }
 819
 820        erase_request(&mdsc->request_tree, req);
 821
 822        if (req->r_unsafe_dir  &&
 823            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 824                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 825                spin_lock(&ci->i_unsafe_lock);
 826                list_del_init(&req->r_unsafe_dir_item);
 827                spin_unlock(&ci->i_unsafe_lock);
 828        }
 829        if (req->r_target_inode &&
 830            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 831                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 832                spin_lock(&ci->i_unsafe_lock);
 833                list_del_init(&req->r_unsafe_target_item);
 834                spin_unlock(&ci->i_unsafe_lock);
 835        }
 836
 837        if (req->r_unsafe_dir) {
 838                /* avoid calling iput_final() in mds dispatch threads */
 839                ceph_async_iput(req->r_unsafe_dir);
 840                req->r_unsafe_dir = NULL;
 841        }
 842
 843        complete_all(&req->r_safe_completion);
 844
 845        ceph_mdsc_put_request(req);
 846}
 847
 848/*
 849 * Walk back up the dentry tree until we hit a dentry representing a
 850 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
 851 * when calling this) to ensure that the objects won't disappear while we're
 852 * working with them. Once we hit a candidate dentry, we attempt to take a
 853 * reference to it, and return that as the result.
 854 */
 855static struct inode *get_nonsnap_parent(struct dentry *dentry)
 856{
 857        struct inode *inode = NULL;
 858
 859        while (dentry && !IS_ROOT(dentry)) {
 860                inode = d_inode_rcu(dentry);
 861                if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
 862                        break;
 863                dentry = dentry->d_parent;
 864        }
 865        if (inode)
 866                inode = igrab(inode);
 867        return inode;
 868}
 869
 870/*
 871 * Choose mds to send request to next.  If there is a hint set in the
 872 * request (e.g., due to a prior forward hint from the mds), use that.
 873 * Otherwise, consult frag tree and/or caps to identify the
 874 * appropriate mds.  If all else fails, choose randomly.
 875 *
 876 * Called under mdsc->mutex.
 877 */
 878static int __choose_mds(struct ceph_mds_client *mdsc,
 879                        struct ceph_mds_request *req)
 880{
 881        struct inode *inode;
 882        struct ceph_inode_info *ci;
 883        struct ceph_cap *cap;
 884        int mode = req->r_direct_mode;
 885        int mds = -1;
 886        u32 hash = req->r_direct_hash;
 887        bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
 888
 889        /*
 890         * is there a specific mds we should try?  ignore hint if we have
 891         * no session and the mds is not up (active or recovering).
 892         */
 893        if (req->r_resend_mds >= 0 &&
 894            (__have_session(mdsc, req->r_resend_mds) ||
 895             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 896                dout("choose_mds using resend_mds mds%d\n",
 897                     req->r_resend_mds);
 898                return req->r_resend_mds;
 899        }
 900
 901        if (mode == USE_RANDOM_MDS)
 902                goto random;
 903
 904        inode = NULL;
 905        if (req->r_inode) {
 906                if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
 907                        inode = req->r_inode;
 908                        ihold(inode);
 909                } else {
 910                        /* req->r_dentry is non-null for LSSNAP request */
 911                        rcu_read_lock();
 912                        inode = get_nonsnap_parent(req->r_dentry);
 913                        rcu_read_unlock();
 914                        dout("__choose_mds using snapdir's parent %p\n", inode);
 915                }
 916        } else if (req->r_dentry) {
 917                /* ignore race with rename; old or new d_parent is okay */
 918                struct dentry *parent;
 919                struct inode *dir;
 920
 921                rcu_read_lock();
 922                parent = READ_ONCE(req->r_dentry->d_parent);
 923                dir = req->r_parent ? : d_inode_rcu(parent);
 924
 925                if (!dir || dir->i_sb != mdsc->fsc->sb) {
 926                        /*  not this fs or parent went negative */
 927                        inode = d_inode(req->r_dentry);
 928                        if (inode)
 929                                ihold(inode);
 930                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
 931                        /* direct snapped/virtual snapdir requests
 932                         * based on parent dir inode */
 933                        inode = get_nonsnap_parent(parent);
 934                        dout("__choose_mds using nonsnap parent %p\n", inode);
 935                } else {
 936                        /* dentry target */
 937                        inode = d_inode(req->r_dentry);
 938                        if (!inode || mode == USE_AUTH_MDS) {
 939                                /* dir + name */
 940                                inode = igrab(dir);
 941                                hash = ceph_dentry_hash(dir, req->r_dentry);
 942                                is_hash = true;
 943                        } else {
 944                                ihold(inode);
 945                        }
 946                }
 947                rcu_read_unlock();
 948        }
 949
 950        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 951             (int)hash, mode);
 952        if (!inode)
 953                goto random;
 954        ci = ceph_inode(inode);
 955
 956        if (is_hash && S_ISDIR(inode->i_mode)) {
 957                struct ceph_inode_frag frag;
 958                int found;
 959
 960                ceph_choose_frag(ci, hash, &frag, &found);
 961                if (found) {
 962                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
 963                                u8 r;
 964
 965                                /* choose a random replica */
 966                                get_random_bytes(&r, 1);
 967                                r %= frag.ndist;
 968                                mds = frag.dist[r];
 969                                dout("choose_mds %p %llx.%llx "
 970                                     "frag %u mds%d (%d/%d)\n",
 971                                     inode, ceph_vinop(inode),
 972                                     frag.frag, mds,
 973                                     (int)r, frag.ndist);
 974                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 975                                    CEPH_MDS_STATE_ACTIVE)
 976                                        goto out;
 977                        }
 978
 979                        /* since this file/dir wasn't known to be
 980                         * replicated, then we want to look for the
 981                         * authoritative mds. */
 982                        mode = USE_AUTH_MDS;
 983                        if (frag.mds >= 0) {
 984                                /* choose auth mds */
 985                                mds = frag.mds;
 986                                dout("choose_mds %p %llx.%llx "
 987                                     "frag %u mds%d (auth)\n",
 988                                     inode, ceph_vinop(inode), frag.frag, mds);
 989                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 990                                    CEPH_MDS_STATE_ACTIVE)
 991                                        goto out;
 992                        }
 993                }
 994        }
 995
 996        spin_lock(&ci->i_ceph_lock);
 997        cap = NULL;
 998        if (mode == USE_AUTH_MDS)
 999                cap = ci->i_auth_cap;
1000        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1001                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1002        if (!cap) {
1003                spin_unlock(&ci->i_ceph_lock);
1004                ceph_async_iput(inode);
1005                goto random;
1006        }
1007        mds = cap->session->s_mds;
1008        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1009             inode, ceph_vinop(inode), mds,
1010             cap == ci->i_auth_cap ? "auth " : "", cap);
1011        spin_unlock(&ci->i_ceph_lock);
1012out:
1013        /* avoid calling iput_final() while holding mdsc->mutex or
1014         * in mds dispatch threads */
1015        ceph_async_iput(inode);
1016        return mds;
1017
1018random:
1019        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1020        dout("choose_mds chose random mds%d\n", mds);
1021        return mds;
1022}
1023
1024
1025/*
1026 * session messages
1027 */
1028static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1029{
1030        struct ceph_msg *msg;
1031        struct ceph_mds_session_head *h;
1032
1033        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1034                           false);
1035        if (!msg) {
1036                pr_err("create_session_msg ENOMEM creating msg\n");
1037                return NULL;
1038        }
1039        h = msg->front.iov_base;
1040        h->op = cpu_to_le32(op);
1041        h->seq = cpu_to_le64(seq);
1042
1043        return msg;
1044}
1045
1046static void encode_supported_features(void **p, void *end)
1047{
1048        static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1049        static const size_t count = ARRAY_SIZE(bits);
1050
1051        if (count > 0) {
1052                size_t i;
1053                size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1054
1055                BUG_ON(*p + 4 + size > end);
1056                ceph_encode_32(p, size);
1057                memset(*p, 0, size);
1058                for (i = 0; i < count; i++)
1059                        ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1060                *p += size;
1061        } else {
1062                BUG_ON(*p + 4 > end);
1063                ceph_encode_32(p, 0);
1064        }
1065}
1066
1067/*
1068 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1069 * to include additional client metadata fields.
1070 */
1071static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1072{
1073        struct ceph_msg *msg;
1074        struct ceph_mds_session_head *h;
1075        int i = -1;
1076        int extra_bytes = 0;
1077        int metadata_key_count = 0;
1078        struct ceph_options *opt = mdsc->fsc->client->options;
1079        struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1080        void *p, *end;
1081
1082        const char* metadata[][2] = {
1083                {"hostname", mdsc->nodename},
1084                {"kernel_version", init_utsname()->release},
1085                {"entity_id", opt->name ? : ""},
1086                {"root", fsopt->server_path ? : "/"},
1087                {NULL, NULL}
1088        };
1089
1090        /* Calculate serialized length of metadata */
1091        extra_bytes = 4;  /* map length */
1092        for (i = 0; metadata[i][0]; ++i) {
1093                extra_bytes += 8 + strlen(metadata[i][0]) +
1094                        strlen(metadata[i][1]);
1095                metadata_key_count++;
1096        }
1097        /* supported feature */
1098        extra_bytes += 4 + 8;
1099
1100        /* Allocate the message */
1101        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1102                           GFP_NOFS, false);
1103        if (!msg) {
1104                pr_err("create_session_msg ENOMEM creating msg\n");
1105                return NULL;
1106        }
1107        p = msg->front.iov_base;
1108        end = p + msg->front.iov_len;
1109
1110        h = p;
1111        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1112        h->seq = cpu_to_le64(seq);
1113
1114        /*
1115         * Serialize client metadata into waiting buffer space, using
1116         * the format that userspace expects for map<string, string>
1117         *
1118         * ClientSession messages with metadata are v2
1119         */
1120        msg->hdr.version = cpu_to_le16(3);
1121        msg->hdr.compat_version = cpu_to_le16(1);
1122
1123        /* The write pointer, following the session_head structure */
1124        p += sizeof(*h);
1125
1126        /* Number of entries in the map */
1127        ceph_encode_32(&p, metadata_key_count);
1128
1129        /* Two length-prefixed strings for each entry in the map */
1130        for (i = 0; metadata[i][0]; ++i) {
1131                size_t const key_len = strlen(metadata[i][0]);
1132                size_t const val_len = strlen(metadata[i][1]);
1133
1134                ceph_encode_32(&p, key_len);
1135                memcpy(p, metadata[i][0], key_len);
1136                p += key_len;
1137                ceph_encode_32(&p, val_len);
1138                memcpy(p, metadata[i][1], val_len);
1139                p += val_len;
1140        }
1141
1142        encode_supported_features(&p, end);
1143        msg->front.iov_len = p - msg->front.iov_base;
1144        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1145
1146        return msg;
1147}
1148
1149/*
1150 * send session open request.
1151 *
1152 * called under mdsc->mutex
1153 */
1154static int __open_session(struct ceph_mds_client *mdsc,
1155                          struct ceph_mds_session *session)
1156{
1157        struct ceph_msg *msg;
1158        int mstate;
1159        int mds = session->s_mds;
1160
1161        /* wait for mds to go active? */
1162        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1163        dout("open_session to mds%d (%s)\n", mds,
1164             ceph_mds_state_name(mstate));
1165        session->s_state = CEPH_MDS_SESSION_OPENING;
1166        session->s_renew_requested = jiffies;
1167
1168        /* send connect message */
1169        msg = create_session_open_msg(mdsc, session->s_seq);
1170        if (!msg)
1171                return -ENOMEM;
1172        ceph_con_send(&session->s_con, msg);
1173        return 0;
1174}
1175
1176/*
1177 * open sessions for any export targets for the given mds
1178 *
1179 * called under mdsc->mutex
1180 */
1181static struct ceph_mds_session *
1182__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1183{
1184        struct ceph_mds_session *session;
1185
1186        session = __ceph_lookup_mds_session(mdsc, target);
1187        if (!session) {
1188                session = register_session(mdsc, target);
1189                if (IS_ERR(session))
1190                        return session;
1191        }
1192        if (session->s_state == CEPH_MDS_SESSION_NEW ||
1193            session->s_state == CEPH_MDS_SESSION_CLOSING)
1194                __open_session(mdsc, session);
1195
1196        return session;
1197}
1198
1199struct ceph_mds_session *
1200ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1201{
1202        struct ceph_mds_session *session;
1203
1204        dout("open_export_target_session to mds%d\n", target);
1205
1206        mutex_lock(&mdsc->mutex);
1207        session = __open_export_target_session(mdsc, target);
1208        mutex_unlock(&mdsc->mutex);
1209
1210        return session;
1211}
1212
1213static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1214                                          struct ceph_mds_session *session)
1215{
1216        struct ceph_mds_info *mi;
1217        struct ceph_mds_session *ts;
1218        int i, mds = session->s_mds;
1219
1220        if (mds >= mdsc->mdsmap->m_num_mds)
1221                return;
1222
1223        mi = &mdsc->mdsmap->m_info[mds];
1224        dout("open_export_target_sessions for mds%d (%d targets)\n",
1225             session->s_mds, mi->num_export_targets);
1226
1227        for (i = 0; i < mi->num_export_targets; i++) {
1228                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1229                if (!IS_ERR(ts))
1230                        ceph_put_mds_session(ts);
1231        }
1232}
1233
1234void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1235                                           struct ceph_mds_session *session)
1236{
1237        mutex_lock(&mdsc->mutex);
1238        __open_export_target_sessions(mdsc, session);
1239        mutex_unlock(&mdsc->mutex);
1240}
1241
1242/*
1243 * session caps
1244 */
1245
1246static void detach_cap_releases(struct ceph_mds_session *session,
1247                                struct list_head *target)
1248{
1249        lockdep_assert_held(&session->s_cap_lock);
1250
1251        list_splice_init(&session->s_cap_releases, target);
1252        session->s_num_cap_releases = 0;
1253        dout("dispose_cap_releases mds%d\n", session->s_mds);
1254}
1255
1256static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1257                                 struct list_head *dispose)
1258{
1259        while (!list_empty(dispose)) {
1260                struct ceph_cap *cap;
1261                /* zero out the in-progress message */
1262                cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1263                list_del(&cap->session_caps);
1264                ceph_put_cap(mdsc, cap);
1265        }
1266}
1267
1268static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1269                                     struct ceph_mds_session *session)
1270{
1271        struct ceph_mds_request *req;
1272        struct rb_node *p;
1273
1274        dout("cleanup_session_requests mds%d\n", session->s_mds);
1275        mutex_lock(&mdsc->mutex);
1276        while (!list_empty(&session->s_unsafe)) {
1277                req = list_first_entry(&session->s_unsafe,
1278                                       struct ceph_mds_request, r_unsafe_item);
1279                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1280                                    req->r_tid);
1281                __unregister_request(mdsc, req);
1282        }
1283        /* zero r_attempts, so kick_requests() will re-send requests */
1284        p = rb_first(&mdsc->request_tree);
1285        while (p) {
1286                req = rb_entry(p, struct ceph_mds_request, r_node);
1287                p = rb_next(p);
1288                if (req->r_session &&
1289                    req->r_session->s_mds == session->s_mds)
1290                        req->r_attempts = 0;
1291        }
1292        mutex_unlock(&mdsc->mutex);
1293}
1294
1295/*
1296 * Helper to safely iterate over all caps associated with a session, with
1297 * special care taken to handle a racing __ceph_remove_cap().
1298 *
1299 * Caller must hold session s_mutex.
1300 */
1301int ceph_iterate_session_caps(struct ceph_mds_session *session,
1302                              int (*cb)(struct inode *, struct ceph_cap *,
1303                                        void *), void *arg)
1304{
1305        struct list_head *p;
1306        struct ceph_cap *cap;
1307        struct inode *inode, *last_inode = NULL;
1308        struct ceph_cap *old_cap = NULL;
1309        int ret;
1310
1311        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1312        spin_lock(&session->s_cap_lock);
1313        p = session->s_caps.next;
1314        while (p != &session->s_caps) {
1315                cap = list_entry(p, struct ceph_cap, session_caps);
1316                inode = igrab(&cap->ci->vfs_inode);
1317                if (!inode) {
1318                        p = p->next;
1319                        continue;
1320                }
1321                session->s_cap_iterator = cap;
1322                spin_unlock(&session->s_cap_lock);
1323
1324                if (last_inode) {
1325                        /* avoid calling iput_final() while holding
1326                         * s_mutex or in mds dispatch threads */
1327                        ceph_async_iput(last_inode);
1328                        last_inode = NULL;
1329                }
1330                if (old_cap) {
1331                        ceph_put_cap(session->s_mdsc, old_cap);
1332                        old_cap = NULL;
1333                }
1334
1335                ret = cb(inode, cap, arg);
1336                last_inode = inode;
1337
1338                spin_lock(&session->s_cap_lock);
1339                p = p->next;
1340                if (!cap->ci) {
1341                        dout("iterate_session_caps  finishing cap %p removal\n",
1342                             cap);
1343                        BUG_ON(cap->session != session);
1344                        cap->session = NULL;
1345                        list_del_init(&cap->session_caps);
1346                        session->s_nr_caps--;
1347                        if (cap->queue_release)
1348                                __ceph_queue_cap_release(session, cap);
1349                        else
1350                                old_cap = cap;  /* put_cap it w/o locks held */
1351                }
1352                if (ret < 0)
1353                        goto out;
1354        }
1355        ret = 0;
1356out:
1357        session->s_cap_iterator = NULL;
1358        spin_unlock(&session->s_cap_lock);
1359
1360        ceph_async_iput(last_inode);
1361        if (old_cap)
1362                ceph_put_cap(session->s_mdsc, old_cap);
1363
1364        return ret;
1365}
1366
1367static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1368                                  void *arg)
1369{
1370        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1371        struct ceph_inode_info *ci = ceph_inode(inode);
1372        LIST_HEAD(to_remove);
1373        bool drop = false;
1374        bool invalidate = false;
1375
1376        dout("removing cap %p, ci is %p, inode is %p\n",
1377             cap, ci, &ci->vfs_inode);
1378        spin_lock(&ci->i_ceph_lock);
1379        if (cap->mds_wanted | cap->issued)
1380                ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1381        __ceph_remove_cap(cap, false);
1382        if (!ci->i_auth_cap) {
1383                struct ceph_cap_flush *cf;
1384                struct ceph_mds_client *mdsc = fsc->mdsc;
1385
1386                if (ci->i_wrbuffer_ref > 0 &&
1387                    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1388                        invalidate = true;
1389
1390                while (!list_empty(&ci->i_cap_flush_list)) {
1391                        cf = list_first_entry(&ci->i_cap_flush_list,
1392                                              struct ceph_cap_flush, i_list);
1393                        list_move(&cf->i_list, &to_remove);
1394                }
1395
1396                spin_lock(&mdsc->cap_dirty_lock);
1397
1398                list_for_each_entry(cf, &to_remove, i_list)
1399                        list_del(&cf->g_list);
1400
1401                if (!list_empty(&ci->i_dirty_item)) {
1402                        pr_warn_ratelimited(
1403                                " dropping dirty %s state for %p %lld\n",
1404                                ceph_cap_string(ci->i_dirty_caps),
1405                                inode, ceph_ino(inode));
1406                        ci->i_dirty_caps = 0;
1407                        list_del_init(&ci->i_dirty_item);
1408                        drop = true;
1409                }
1410                if (!list_empty(&ci->i_flushing_item)) {
1411                        pr_warn_ratelimited(
1412                                " dropping dirty+flushing %s state for %p %lld\n",
1413                                ceph_cap_string(ci->i_flushing_caps),
1414                                inode, ceph_ino(inode));
1415                        ci->i_flushing_caps = 0;
1416                        list_del_init(&ci->i_flushing_item);
1417                        mdsc->num_cap_flushing--;
1418                        drop = true;
1419                }
1420                spin_unlock(&mdsc->cap_dirty_lock);
1421
1422                if (atomic_read(&ci->i_filelock_ref) > 0) {
1423                        /* make further file lock syscall return -EIO */
1424                        ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1425                        pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1426                                            inode, ceph_ino(inode));
1427                }
1428
1429                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1430                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1431                        ci->i_prealloc_cap_flush = NULL;
1432                }
1433
1434               if (drop &&
1435                  ci->i_wrbuffer_ref_head == 0 &&
1436                  ci->i_wr_ref == 0 &&
1437                  ci->i_dirty_caps == 0 &&
1438                  ci->i_flushing_caps == 0) {
1439                      ceph_put_snap_context(ci->i_head_snapc);
1440                      ci->i_head_snapc = NULL;
1441               }
1442        }
1443        spin_unlock(&ci->i_ceph_lock);
1444        while (!list_empty(&to_remove)) {
1445                struct ceph_cap_flush *cf;
1446                cf = list_first_entry(&to_remove,
1447                                      struct ceph_cap_flush, i_list);
1448                list_del(&cf->i_list);
1449                ceph_free_cap_flush(cf);
1450        }
1451
1452        wake_up_all(&ci->i_cap_wq);
1453        if (invalidate)
1454                ceph_queue_invalidate(inode);
1455        if (drop)
1456                iput(inode);
1457        return 0;
1458}
1459
1460/*
1461 * caller must hold session s_mutex
1462 */
1463static void remove_session_caps(struct ceph_mds_session *session)
1464{
1465        struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1466        struct super_block *sb = fsc->sb;
1467        LIST_HEAD(dispose);
1468
1469        dout("remove_session_caps on %p\n", session);
1470        ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1471
1472        wake_up_all(&fsc->mdsc->cap_flushing_wq);
1473
1474        spin_lock(&session->s_cap_lock);
1475        if (session->s_nr_caps > 0) {
1476                struct inode *inode;
1477                struct ceph_cap *cap, *prev = NULL;
1478                struct ceph_vino vino;
1479                /*
1480                 * iterate_session_caps() skips inodes that are being
1481                 * deleted, we need to wait until deletions are complete.
1482                 * __wait_on_freeing_inode() is designed for the job,
1483                 * but it is not exported, so use lookup inode function
1484                 * to access it.
1485                 */
1486                while (!list_empty(&session->s_caps)) {
1487                        cap = list_entry(session->s_caps.next,
1488                                         struct ceph_cap, session_caps);
1489                        if (cap == prev)
1490                                break;
1491                        prev = cap;
1492                        vino = cap->ci->i_vino;
1493                        spin_unlock(&session->s_cap_lock);
1494
1495                        inode = ceph_find_inode(sb, vino);
1496                         /* avoid calling iput_final() while holding s_mutex */
1497                        ceph_async_iput(inode);
1498
1499                        spin_lock(&session->s_cap_lock);
1500                }
1501        }
1502
1503        // drop cap expires and unlock s_cap_lock
1504        detach_cap_releases(session, &dispose);
1505
1506        BUG_ON(session->s_nr_caps > 0);
1507        BUG_ON(!list_empty(&session->s_cap_flushing));
1508        spin_unlock(&session->s_cap_lock);
1509        dispose_cap_releases(session->s_mdsc, &dispose);
1510}
1511
1512enum {
1513        RECONNECT,
1514        RENEWCAPS,
1515        FORCE_RO,
1516};
1517
1518/*
1519 * wake up any threads waiting on this session's caps.  if the cap is
1520 * old (didn't get renewed on the client reconnect), remove it now.
1521 *
1522 * caller must hold s_mutex.
1523 */
1524static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1525                              void *arg)
1526{
1527        struct ceph_inode_info *ci = ceph_inode(inode);
1528        unsigned long ev = (unsigned long)arg;
1529
1530        if (ev == RECONNECT) {
1531                spin_lock(&ci->i_ceph_lock);
1532                ci->i_wanted_max_size = 0;
1533                ci->i_requested_max_size = 0;
1534                spin_unlock(&ci->i_ceph_lock);
1535        } else if (ev == RENEWCAPS) {
1536                if (cap->cap_gen < cap->session->s_cap_gen) {
1537                        /* mds did not re-issue stale cap */
1538                        spin_lock(&ci->i_ceph_lock);
1539                        cap->issued = cap->implemented = CEPH_CAP_PIN;
1540                        /* make sure mds knows what we want */
1541                        if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
1542                                ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1543                        spin_unlock(&ci->i_ceph_lock);
1544                }
1545        } else if (ev == FORCE_RO) {
1546        }
1547        wake_up_all(&ci->i_cap_wq);
1548        return 0;
1549}
1550
1551static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1552{
1553        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1554        ceph_iterate_session_caps(session, wake_up_session_cb,
1555                                  (void *)(unsigned long)ev);
1556}
1557
1558/*
1559 * Send periodic message to MDS renewing all currently held caps.  The
1560 * ack will reset the expiration for all caps from this session.
1561 *
1562 * caller holds s_mutex
1563 */
1564static int send_renew_caps(struct ceph_mds_client *mdsc,
1565                           struct ceph_mds_session *session)
1566{
1567        struct ceph_msg *msg;
1568        int state;
1569
1570        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1571            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1572                pr_info("mds%d caps stale\n", session->s_mds);
1573        session->s_renew_requested = jiffies;
1574
1575        /* do not try to renew caps until a recovering mds has reconnected
1576         * with its clients. */
1577        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1578        if (state < CEPH_MDS_STATE_RECONNECT) {
1579                dout("send_renew_caps ignoring mds%d (%s)\n",
1580                     session->s_mds, ceph_mds_state_name(state));
1581                return 0;
1582        }
1583
1584        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1585                ceph_mds_state_name(state));
1586        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1587                                 ++session->s_renew_seq);
1588        if (!msg)
1589                return -ENOMEM;
1590        ceph_con_send(&session->s_con, msg);
1591        return 0;
1592}
1593
1594static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1595                             struct ceph_mds_session *session, u64 seq)
1596{
1597        struct ceph_msg *msg;
1598
1599        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1600             session->s_mds, ceph_session_state_name(session->s_state), seq);
1601        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1602        if (!msg)
1603                return -ENOMEM;
1604        ceph_con_send(&session->s_con, msg);
1605        return 0;
1606}
1607
1608
1609/*
1610 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1611 *
1612 * Called under session->s_mutex
1613 */
1614static void renewed_caps(struct ceph_mds_client *mdsc,
1615                         struct ceph_mds_session *session, int is_renew)
1616{
1617        int was_stale;
1618        int wake = 0;
1619
1620        spin_lock(&session->s_cap_lock);
1621        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1622
1623        session->s_cap_ttl = session->s_renew_requested +
1624                mdsc->mdsmap->m_session_timeout*HZ;
1625
1626        if (was_stale) {
1627                if (time_before(jiffies, session->s_cap_ttl)) {
1628                        pr_info("mds%d caps renewed\n", session->s_mds);
1629                        wake = 1;
1630                } else {
1631                        pr_info("mds%d caps still stale\n", session->s_mds);
1632                }
1633        }
1634        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1635             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1636             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1637        spin_unlock(&session->s_cap_lock);
1638
1639        if (wake)
1640                wake_up_session_caps(session, RENEWCAPS);
1641}
1642
1643/*
1644 * send a session close request
1645 */
1646static int request_close_session(struct ceph_mds_client *mdsc,
1647                                 struct ceph_mds_session *session)
1648{
1649        struct ceph_msg *msg;
1650
1651        dout("request_close_session mds%d state %s seq %lld\n",
1652             session->s_mds, ceph_session_state_name(session->s_state),
1653             session->s_seq);
1654        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1655        if (!msg)
1656                return -ENOMEM;
1657        ceph_con_send(&session->s_con, msg);
1658        return 1;
1659}
1660
1661/*
1662 * Called with s_mutex held.
1663 */
1664static int __close_session(struct ceph_mds_client *mdsc,
1665                         struct ceph_mds_session *session)
1666{
1667        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1668                return 0;
1669        session->s_state = CEPH_MDS_SESSION_CLOSING;
1670        return request_close_session(mdsc, session);
1671}
1672
1673static bool drop_negative_children(struct dentry *dentry)
1674{
1675        struct dentry *child;
1676        bool all_negative = true;
1677
1678        if (!d_is_dir(dentry))
1679                goto out;
1680
1681        spin_lock(&dentry->d_lock);
1682        list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1683                if (d_really_is_positive(child)) {
1684                        all_negative = false;
1685                        break;
1686                }
1687        }
1688        spin_unlock(&dentry->d_lock);
1689
1690        if (all_negative)
1691                shrink_dcache_parent(dentry);
1692out:
1693        return all_negative;
1694}
1695
1696/*
1697 * Trim old(er) caps.
1698 *
1699 * Because we can't cache an inode without one or more caps, we do
1700 * this indirectly: if a cap is unused, we prune its aliases, at which
1701 * point the inode will hopefully get dropped to.
1702 *
1703 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1704 * memory pressure from the MDS, though, so it needn't be perfect.
1705 */
1706static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1707{
1708        struct ceph_mds_session *session = arg;
1709        struct ceph_inode_info *ci = ceph_inode(inode);
1710        int used, wanted, oissued, mine;
1711
1712        if (session->s_trim_caps <= 0)
1713                return -1;
1714
1715        spin_lock(&ci->i_ceph_lock);
1716        mine = cap->issued | cap->implemented;
1717        used = __ceph_caps_used(ci);
1718        wanted = __ceph_caps_file_wanted(ci);
1719        oissued = __ceph_caps_issued_other(ci, cap);
1720
1721        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1722             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1723             ceph_cap_string(used), ceph_cap_string(wanted));
1724        if (cap == ci->i_auth_cap) {
1725                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1726                    !list_empty(&ci->i_cap_snaps))
1727                        goto out;
1728                if ((used | wanted) & CEPH_CAP_ANY_WR)
1729                        goto out;
1730                /* Note: it's possible that i_filelock_ref becomes non-zero
1731                 * after dropping auth caps. It doesn't hurt because reply
1732                 * of lock mds request will re-add auth caps. */
1733                if (atomic_read(&ci->i_filelock_ref) > 0)
1734                        goto out;
1735        }
1736        /* The inode has cached pages, but it's no longer used.
1737         * we can safely drop it */
1738        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1739            !(oissued & CEPH_CAP_FILE_CACHE)) {
1740          used = 0;
1741          oissued = 0;
1742        }
1743        if ((used | wanted) & ~oissued & mine)
1744                goto out;   /* we need these caps */
1745
1746        if (oissued) {
1747                /* we aren't the only cap.. just remove us */
1748                __ceph_remove_cap(cap, true);
1749                session->s_trim_caps--;
1750        } else {
1751                struct dentry *dentry;
1752                /* try dropping referring dentries */
1753                spin_unlock(&ci->i_ceph_lock);
1754                dentry = d_find_any_alias(inode);
1755                if (dentry && drop_negative_children(dentry)) {
1756                        int count;
1757                        dput(dentry);
1758                        d_prune_aliases(inode);
1759                        count = atomic_read(&inode->i_count);
1760                        if (count == 1)
1761                                session->s_trim_caps--;
1762                        dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1763                             inode, cap, count);
1764                } else {
1765                        dput(dentry);
1766                }
1767                return 0;
1768        }
1769
1770out:
1771        spin_unlock(&ci->i_ceph_lock);
1772        return 0;
1773}
1774
1775/*
1776 * Trim session cap count down to some max number.
1777 */
1778int ceph_trim_caps(struct ceph_mds_client *mdsc,
1779                   struct ceph_mds_session *session,
1780                   int max_caps)
1781{
1782        int trim_caps = session->s_nr_caps - max_caps;
1783
1784        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1785             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1786        if (trim_caps > 0) {
1787                session->s_trim_caps = trim_caps;
1788                ceph_iterate_session_caps(session, trim_caps_cb, session);
1789                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1790                     session->s_mds, session->s_nr_caps, max_caps,
1791                        trim_caps - session->s_trim_caps);
1792                session->s_trim_caps = 0;
1793        }
1794
1795        ceph_flush_cap_releases(mdsc, session);
1796        return 0;
1797}
1798
1799static int check_caps_flush(struct ceph_mds_client *mdsc,
1800                            u64 want_flush_tid)
1801{
1802        int ret = 1;
1803
1804        spin_lock(&mdsc->cap_dirty_lock);
1805        if (!list_empty(&mdsc->cap_flush_list)) {
1806                struct ceph_cap_flush *cf =
1807                        list_first_entry(&mdsc->cap_flush_list,
1808                                         struct ceph_cap_flush, g_list);
1809                if (cf->tid <= want_flush_tid) {
1810                        dout("check_caps_flush still flushing tid "
1811                             "%llu <= %llu\n", cf->tid, want_flush_tid);
1812                        ret = 0;
1813                }
1814        }
1815        spin_unlock(&mdsc->cap_dirty_lock);
1816        return ret;
1817}
1818
1819/*
1820 * flush all dirty inode data to disk.
1821 *
1822 * returns true if we've flushed through want_flush_tid
1823 */
1824static void wait_caps_flush(struct ceph_mds_client *mdsc,
1825                            u64 want_flush_tid)
1826{
1827        dout("check_caps_flush want %llu\n", want_flush_tid);
1828
1829        wait_event(mdsc->cap_flushing_wq,
1830                   check_caps_flush(mdsc, want_flush_tid));
1831
1832        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1833}
1834
1835/*
1836 * called under s_mutex
1837 */
1838static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1839                                   struct ceph_mds_session *session)
1840{
1841        struct ceph_msg *msg = NULL;
1842        struct ceph_mds_cap_release *head;
1843        struct ceph_mds_cap_item *item;
1844        struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1845        struct ceph_cap *cap;
1846        LIST_HEAD(tmp_list);
1847        int num_cap_releases;
1848        __le32  barrier, *cap_barrier;
1849
1850        down_read(&osdc->lock);
1851        barrier = cpu_to_le32(osdc->epoch_barrier);
1852        up_read(&osdc->lock);
1853
1854        spin_lock(&session->s_cap_lock);
1855again:
1856        list_splice_init(&session->s_cap_releases, &tmp_list);
1857        num_cap_releases = session->s_num_cap_releases;
1858        session->s_num_cap_releases = 0;
1859        spin_unlock(&session->s_cap_lock);
1860
1861        while (!list_empty(&tmp_list)) {
1862                if (!msg) {
1863                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1864                                        PAGE_SIZE, GFP_NOFS, false);
1865                        if (!msg)
1866                                goto out_err;
1867                        head = msg->front.iov_base;
1868                        head->num = cpu_to_le32(0);
1869                        msg->front.iov_len = sizeof(*head);
1870
1871                        msg->hdr.version = cpu_to_le16(2);
1872                        msg->hdr.compat_version = cpu_to_le16(1);
1873                }
1874
1875                cap = list_first_entry(&tmp_list, struct ceph_cap,
1876                                        session_caps);
1877                list_del(&cap->session_caps);
1878                num_cap_releases--;
1879
1880                head = msg->front.iov_base;
1881                put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
1882                                   &head->num);
1883                item = msg->front.iov_base + msg->front.iov_len;
1884                item->ino = cpu_to_le64(cap->cap_ino);
1885                item->cap_id = cpu_to_le64(cap->cap_id);
1886                item->migrate_seq = cpu_to_le32(cap->mseq);
1887                item->seq = cpu_to_le32(cap->issue_seq);
1888                msg->front.iov_len += sizeof(*item);
1889
1890                ceph_put_cap(mdsc, cap);
1891
1892                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1893                        // Append cap_barrier field
1894                        cap_barrier = msg->front.iov_base + msg->front.iov_len;
1895                        *cap_barrier = barrier;
1896                        msg->front.iov_len += sizeof(*cap_barrier);
1897
1898                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1899                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1900                        ceph_con_send(&session->s_con, msg);
1901                        msg = NULL;
1902                }
1903        }
1904
1905        BUG_ON(num_cap_releases != 0);
1906
1907        spin_lock(&session->s_cap_lock);
1908        if (!list_empty(&session->s_cap_releases))
1909                goto again;
1910        spin_unlock(&session->s_cap_lock);
1911
1912        if (msg) {
1913                // Append cap_barrier field
1914                cap_barrier = msg->front.iov_base + msg->front.iov_len;
1915                *cap_barrier = barrier;
1916                msg->front.iov_len += sizeof(*cap_barrier);
1917
1918                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1919                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1920                ceph_con_send(&session->s_con, msg);
1921        }
1922        return;
1923out_err:
1924        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1925                session->s_mds);
1926        spin_lock(&session->s_cap_lock);
1927        list_splice(&tmp_list, &session->s_cap_releases);
1928        session->s_num_cap_releases += num_cap_releases;
1929        spin_unlock(&session->s_cap_lock);
1930}
1931
1932static void ceph_cap_release_work(struct work_struct *work)
1933{
1934        struct ceph_mds_session *session =
1935                container_of(work, struct ceph_mds_session, s_cap_release_work);
1936
1937        mutex_lock(&session->s_mutex);
1938        if (session->s_state == CEPH_MDS_SESSION_OPEN ||
1939            session->s_state == CEPH_MDS_SESSION_HUNG)
1940                ceph_send_cap_releases(session->s_mdsc, session);
1941        mutex_unlock(&session->s_mutex);
1942        ceph_put_mds_session(session);
1943}
1944
1945void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
1946                             struct ceph_mds_session *session)
1947{
1948        if (mdsc->stopping)
1949                return;
1950
1951        get_session(session);
1952        if (queue_work(mdsc->fsc->cap_wq,
1953                       &session->s_cap_release_work)) {
1954                dout("cap release work queued\n");
1955        } else {
1956                ceph_put_mds_session(session);
1957                dout("failed to queue cap release work\n");
1958        }
1959}
1960
1961/*
1962 * caller holds session->s_cap_lock
1963 */
1964void __ceph_queue_cap_release(struct ceph_mds_session *session,
1965                              struct ceph_cap *cap)
1966{
1967        list_add_tail(&cap->session_caps, &session->s_cap_releases);
1968        session->s_num_cap_releases++;
1969
1970        if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
1971                ceph_flush_cap_releases(session->s_mdsc, session);
1972}
1973
1974static void ceph_cap_reclaim_work(struct work_struct *work)
1975{
1976        struct ceph_mds_client *mdsc =
1977                container_of(work, struct ceph_mds_client, cap_reclaim_work);
1978        int ret = ceph_trim_dentries(mdsc);
1979        if (ret == -EAGAIN)
1980                ceph_queue_cap_reclaim_work(mdsc);
1981}
1982
1983void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
1984{
1985        if (mdsc->stopping)
1986                return;
1987
1988        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
1989                dout("caps reclaim work queued\n");
1990        } else {
1991                dout("failed to queue caps release work\n");
1992        }
1993}
1994
1995void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
1996{
1997        int val;
1998        if (!nr)
1999                return;
2000        val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2001        if (!(val % CEPH_CAPS_PER_RELEASE)) {
2002                atomic_set(&mdsc->cap_reclaim_pending, 0);
2003                ceph_queue_cap_reclaim_work(mdsc);
2004        }
2005}
2006
2007/*
2008 * requests
2009 */
2010
2011int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2012                                    struct inode *dir)
2013{
2014        struct ceph_inode_info *ci = ceph_inode(dir);
2015        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2016        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2017        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2018        int order, num_entries;
2019
2020        spin_lock(&ci->i_ceph_lock);
2021        num_entries = ci->i_files + ci->i_subdirs;
2022        spin_unlock(&ci->i_ceph_lock);
2023        num_entries = max(num_entries, 1);
2024        num_entries = min(num_entries, opt->max_readdir);
2025
2026        order = get_order(size * num_entries);
2027        while (order >= 0) {
2028                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2029                                                             __GFP_NOWARN,
2030                                                             order);
2031                if (rinfo->dir_entries)
2032                        break;
2033                order--;
2034        }
2035        if (!rinfo->dir_entries)
2036                return -ENOMEM;
2037
2038        num_entries = (PAGE_SIZE << order) / size;
2039        num_entries = min(num_entries, opt->max_readdir);
2040
2041        rinfo->dir_buf_size = PAGE_SIZE << order;
2042        req->r_num_caps = num_entries + 1;
2043        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2044        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2045        return 0;
2046}
2047
2048/*
2049 * Create an mds request.
2050 */
2051struct ceph_mds_request *
2052ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2053{
2054        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
2055        struct timespec64 ts;
2056
2057        if (!req)
2058                return ERR_PTR(-ENOMEM);
2059
2060        mutex_init(&req->r_fill_mutex);
2061        req->r_mdsc = mdsc;
2062        req->r_started = jiffies;
2063        req->r_resend_mds = -1;
2064        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2065        INIT_LIST_HEAD(&req->r_unsafe_target_item);
2066        req->r_fmode = -1;
2067        kref_init(&req->r_kref);
2068        RB_CLEAR_NODE(&req->r_node);
2069        INIT_LIST_HEAD(&req->r_wait);
2070        init_completion(&req->r_completion);
2071        init_completion(&req->r_safe_completion);
2072        INIT_LIST_HEAD(&req->r_unsafe_item);
2073
2074        ktime_get_coarse_real_ts64(&ts);
2075        req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
2076
2077        req->r_op = op;
2078        req->r_direct_mode = mode;
2079        return req;
2080}
2081
2082/*
2083 * return oldest (lowest) request, tid in request tree, 0 if none.
2084 *
2085 * called under mdsc->mutex.
2086 */
2087static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2088{
2089        if (RB_EMPTY_ROOT(&mdsc->request_tree))
2090                return NULL;
2091        return rb_entry(rb_first(&mdsc->request_tree),
2092                        struct ceph_mds_request, r_node);
2093}
2094
2095static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2096{
2097        return mdsc->oldest_tid;
2098}
2099
2100/*
2101 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2102 * on build_path_from_dentry in fs/cifs/dir.c.
2103 *
2104 * If @stop_on_nosnap, generate path relative to the first non-snapped
2105 * inode.
2106 *
2107 * Encode hidden .snap dirs as a double /, i.e.
2108 *   foo/.snap/bar -> foo//bar
2109 */
2110char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2111                           int stop_on_nosnap)
2112{
2113        struct dentry *temp;
2114        char *path;
2115        int pos;
2116        unsigned seq;
2117        u64 base;
2118
2119        if (!dentry)
2120                return ERR_PTR(-EINVAL);
2121
2122        path = __getname();
2123        if (!path)
2124                return ERR_PTR(-ENOMEM);
2125retry:
2126        pos = PATH_MAX - 1;
2127        path[pos] = '\0';
2128
2129        seq = read_seqbegin(&rename_lock);
2130        rcu_read_lock();
2131        temp = dentry;
2132        for (;;) {
2133                struct inode *inode;
2134
2135                spin_lock(&temp->d_lock);
2136                inode = d_inode(temp);
2137                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2138                        dout("build_path path+%d: %p SNAPDIR\n",
2139                             pos, temp);
2140                } else if (stop_on_nosnap && inode && dentry != temp &&
2141                           ceph_snap(inode) == CEPH_NOSNAP) {
2142                        spin_unlock(&temp->d_lock);
2143                        pos++; /* get rid of any prepended '/' */
2144                        break;
2145                } else {
2146                        pos -= temp->d_name.len;
2147                        if (pos < 0) {
2148                                spin_unlock(&temp->d_lock);
2149                                break;
2150                        }
2151                        memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2152                }
2153                spin_unlock(&temp->d_lock);
2154                temp = READ_ONCE(temp->d_parent);
2155
2156                /* Are we at the root? */
2157                if (IS_ROOT(temp))
2158                        break;
2159
2160                /* Are we out of buffer? */
2161                if (--pos < 0)
2162                        break;
2163
2164                path[pos] = '/';
2165        }
2166        base = ceph_ino(d_inode(temp));
2167        rcu_read_unlock();
2168        if (pos < 0 || read_seqretry(&rename_lock, seq)) {
2169                pr_err("build_path did not end path lookup where "
2170                       "expected, pos is %d\n", pos);
2171                /* presumably this is only possible if racing with a
2172                   rename of one of the parent directories (we can not
2173                   lock the dentries above us to prevent this, but
2174                   retrying should be harmless) */
2175                goto retry;
2176        }
2177
2178        *pbase = base;
2179        *plen = PATH_MAX - 1 - pos;
2180        dout("build_path on %p %d built %llx '%.*s'\n",
2181             dentry, d_count(dentry), base, *plen, path + pos);
2182        return path + pos;
2183}
2184
2185static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2186                             const char **ppath, int *ppathlen, u64 *pino,
2187                             bool *pfreepath, bool parent_locked)
2188{
2189        char *path;
2190
2191        rcu_read_lock();
2192        if (!dir)
2193                dir = d_inode_rcu(dentry->d_parent);
2194        if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2195                *pino = ceph_ino(dir);
2196                rcu_read_unlock();
2197                *ppath = dentry->d_name.name;
2198                *ppathlen = dentry->d_name.len;
2199                return 0;
2200        }
2201        rcu_read_unlock();
2202        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2203        if (IS_ERR(path))
2204                return PTR_ERR(path);
2205        *ppath = path;
2206        *pfreepath = true;
2207        return 0;
2208}
2209
2210static int build_inode_path(struct inode *inode,
2211                            const char **ppath, int *ppathlen, u64 *pino,
2212                            bool *pfreepath)
2213{
2214        struct dentry *dentry;
2215        char *path;
2216
2217        if (ceph_snap(inode) == CEPH_NOSNAP) {
2218                *pino = ceph_ino(inode);
2219                *ppathlen = 0;
2220                return 0;
2221        }
2222        dentry = d_find_alias(inode);
2223        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2224        dput(dentry);
2225        if (IS_ERR(path))
2226                return PTR_ERR(path);
2227        *ppath = path;
2228        *pfreepath = true;
2229        return 0;
2230}
2231
2232/*
2233 * request arguments may be specified via an inode *, a dentry *, or
2234 * an explicit ino+path.
2235 */
2236static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2237                                  struct inode *rdiri, const char *rpath,
2238                                  u64 rino, const char **ppath, int *pathlen,
2239                                  u64 *ino, bool *freepath, bool parent_locked)
2240{
2241        int r = 0;
2242
2243        if (rinode) {
2244                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2245                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2246                     ceph_snap(rinode));
2247        } else if (rdentry) {
2248                r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2249                                        freepath, parent_locked);
2250                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2251                     *ppath);
2252        } else if (rpath || rino) {
2253                *ino = rino;
2254                *ppath = rpath;
2255                *pathlen = rpath ? strlen(rpath) : 0;
2256                dout(" path %.*s\n", *pathlen, rpath);
2257        }
2258
2259        return r;
2260}
2261
2262/*
2263 * called under mdsc->mutex
2264 */
2265static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2266                                               struct ceph_mds_request *req,
2267                                               int mds, bool drop_cap_releases)
2268{
2269        struct ceph_msg *msg;
2270        struct ceph_mds_request_head *head;
2271        const char *path1 = NULL;
2272        const char *path2 = NULL;
2273        u64 ino1 = 0, ino2 = 0;
2274        int pathlen1 = 0, pathlen2 = 0;
2275        bool freepath1 = false, freepath2 = false;
2276        int len;
2277        u16 releases;
2278        void *p, *end;
2279        int ret;
2280
2281        ret = set_request_path_attr(req->r_inode, req->r_dentry,
2282                              req->r_parent, req->r_path1, req->r_ino1.ino,
2283                              &path1, &pathlen1, &ino1, &freepath1,
2284                              test_bit(CEPH_MDS_R_PARENT_LOCKED,
2285                                        &req->r_req_flags));
2286        if (ret < 0) {
2287                msg = ERR_PTR(ret);
2288                goto out;
2289        }
2290
2291        /* If r_old_dentry is set, then assume that its parent is locked */
2292        ret = set_request_path_attr(NULL, req->r_old_dentry,
2293                              req->r_old_dentry_dir,
2294                              req->r_path2, req->r_ino2.ino,
2295                              &path2, &pathlen2, &ino2, &freepath2, true);
2296        if (ret < 0) {
2297                msg = ERR_PTR(ret);
2298                goto out_free1;
2299        }
2300
2301        len = sizeof(*head) +
2302                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2303                sizeof(struct ceph_timespec);
2304
2305        /* calculate (max) length for cap releases */
2306        len += sizeof(struct ceph_mds_request_release) *
2307                (!!req->r_inode_drop + !!req->r_dentry_drop +
2308                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2309        if (req->r_dentry_drop)
2310                len += pathlen1;
2311        if (req->r_old_dentry_drop)
2312                len += pathlen2;
2313
2314        msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2315        if (!msg) {
2316                msg = ERR_PTR(-ENOMEM);
2317                goto out_free2;
2318        }
2319
2320        msg->hdr.version = cpu_to_le16(2);
2321        msg->hdr.tid = cpu_to_le64(req->r_tid);
2322
2323        head = msg->front.iov_base;
2324        p = msg->front.iov_base + sizeof(*head);
2325        end = msg->front.iov_base + msg->front.iov_len;
2326
2327        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2328        head->op = cpu_to_le32(req->r_op);
2329        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2330        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2331        head->args = req->r_args;
2332
2333        ceph_encode_filepath(&p, end, ino1, path1);
2334        ceph_encode_filepath(&p, end, ino2, path2);
2335
2336        /* make note of release offset, in case we need to replay */
2337        req->r_request_release_offset = p - msg->front.iov_base;
2338
2339        /* cap releases */
2340        releases = 0;
2341        if (req->r_inode_drop)
2342                releases += ceph_encode_inode_release(&p,
2343                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2344                      mds, req->r_inode_drop, req->r_inode_unless, 0);
2345        if (req->r_dentry_drop)
2346                releases += ceph_encode_dentry_release(&p, req->r_dentry,
2347                                req->r_parent, mds, req->r_dentry_drop,
2348                                req->r_dentry_unless);
2349        if (req->r_old_dentry_drop)
2350                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2351                                req->r_old_dentry_dir, mds,
2352                                req->r_old_dentry_drop,
2353                                req->r_old_dentry_unless);
2354        if (req->r_old_inode_drop)
2355                releases += ceph_encode_inode_release(&p,
2356                      d_inode(req->r_old_dentry),
2357                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2358
2359        if (drop_cap_releases) {
2360                releases = 0;
2361                p = msg->front.iov_base + req->r_request_release_offset;
2362        }
2363
2364        head->num_releases = cpu_to_le16(releases);
2365
2366        /* time stamp */
2367        {
2368                struct ceph_timespec ts;
2369                ceph_encode_timespec64(&ts, &req->r_stamp);
2370                ceph_encode_copy(&p, &ts, sizeof(ts));
2371        }
2372
2373        BUG_ON(p > end);
2374        msg->front.iov_len = p - msg->front.iov_base;
2375        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2376
2377        if (req->r_pagelist) {
2378                struct ceph_pagelist *pagelist = req->r_pagelist;
2379                ceph_msg_data_add_pagelist(msg, pagelist);
2380                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2381        } else {
2382                msg->hdr.data_len = 0;
2383        }
2384
2385        msg->hdr.data_off = cpu_to_le16(0);
2386
2387out_free2:
2388        if (freepath2)
2389                ceph_mdsc_free_path((char *)path2, pathlen2);
2390out_free1:
2391        if (freepath1)
2392                ceph_mdsc_free_path((char *)path1, pathlen1);
2393out:
2394        return msg;
2395}
2396
2397/*
2398 * called under mdsc->mutex if error, under no mutex if
2399 * success.
2400 */
2401static void complete_request(struct ceph_mds_client *mdsc,
2402                             struct ceph_mds_request *req)
2403{
2404        if (req->r_callback)
2405                req->r_callback(mdsc, req);
2406        complete_all(&req->r_completion);
2407}
2408
2409/*
2410 * called under mdsc->mutex
2411 */
2412static int __prepare_send_request(struct ceph_mds_client *mdsc,
2413                                  struct ceph_mds_request *req,
2414                                  int mds, bool drop_cap_releases)
2415{
2416        struct ceph_mds_request_head *rhead;
2417        struct ceph_msg *msg;
2418        int flags = 0;
2419
2420        req->r_attempts++;
2421        if (req->r_inode) {
2422                struct ceph_cap *cap =
2423                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2424
2425                if (cap)
2426                        req->r_sent_on_mseq = cap->mseq;
2427                else
2428                        req->r_sent_on_mseq = -1;
2429        }
2430        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2431             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2432
2433        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2434                void *p;
2435                /*
2436                 * Replay.  Do not regenerate message (and rebuild
2437                 * paths, etc.); just use the original message.
2438                 * Rebuilding paths will break for renames because
2439                 * d_move mangles the src name.
2440                 */
2441                msg = req->r_request;
2442                rhead = msg->front.iov_base;
2443
2444                flags = le32_to_cpu(rhead->flags);
2445                flags |= CEPH_MDS_FLAG_REPLAY;
2446                rhead->flags = cpu_to_le32(flags);
2447
2448                if (req->r_target_inode)
2449                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2450
2451                rhead->num_retry = req->r_attempts - 1;
2452
2453                /* remove cap/dentry releases from message */
2454                rhead->num_releases = 0;
2455
2456                /* time stamp */
2457                p = msg->front.iov_base + req->r_request_release_offset;
2458                {
2459                        struct ceph_timespec ts;
2460                        ceph_encode_timespec64(&ts, &req->r_stamp);
2461                        ceph_encode_copy(&p, &ts, sizeof(ts));
2462                }
2463
2464                msg->front.iov_len = p - msg->front.iov_base;
2465                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2466                return 0;
2467        }
2468
2469        if (req->r_request) {
2470                ceph_msg_put(req->r_request);
2471                req->r_request = NULL;
2472        }
2473        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2474        if (IS_ERR(msg)) {
2475                req->r_err = PTR_ERR(msg);
2476                return PTR_ERR(msg);
2477        }
2478        req->r_request = msg;
2479
2480        rhead = msg->front.iov_base;
2481        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2482        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2483                flags |= CEPH_MDS_FLAG_REPLAY;
2484        if (req->r_parent)
2485                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2486        rhead->flags = cpu_to_le32(flags);
2487        rhead->num_fwd = req->r_num_fwd;
2488        rhead->num_retry = req->r_attempts - 1;
2489        rhead->ino = 0;
2490
2491        dout(" r_parent = %p\n", req->r_parent);
2492        return 0;
2493}
2494
2495/*
2496 * send request, or put it on the appropriate wait list.
2497 */
2498static void __do_request(struct ceph_mds_client *mdsc,
2499                        struct ceph_mds_request *req)
2500{
2501        struct ceph_mds_session *session = NULL;
2502        int mds = -1;
2503        int err = 0;
2504
2505        if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2506                if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2507                        __unregister_request(mdsc, req);
2508                return;
2509        }
2510
2511        if (req->r_timeout &&
2512            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2513                dout("do_request timed out\n");
2514                err = -EIO;
2515                goto finish;
2516        }
2517        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2518                dout("do_request forced umount\n");
2519                err = -EIO;
2520                goto finish;
2521        }
2522        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2523                if (mdsc->mdsmap_err) {
2524                        err = mdsc->mdsmap_err;
2525                        dout("do_request mdsmap err %d\n", err);
2526                        goto finish;
2527                }
2528                if (mdsc->mdsmap->m_epoch == 0) {
2529                        dout("do_request no mdsmap, waiting for map\n");
2530                        list_add(&req->r_wait, &mdsc->waiting_for_map);
2531                        return;
2532                }
2533                if (!(mdsc->fsc->mount_options->flags &
2534                      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2535                    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2536                        err = -ENOENT;
2537                        pr_info("probably no mds server is up\n");
2538                        goto finish;
2539                }
2540        }
2541
2542        put_request_session(req);
2543
2544        mds = __choose_mds(mdsc, req);
2545        if (mds < 0 ||
2546            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2547                dout("do_request no mds or not active, waiting for map\n");
2548                list_add(&req->r_wait, &mdsc->waiting_for_map);
2549                return;
2550        }
2551
2552        /* get, open session */
2553        session = __ceph_lookup_mds_session(mdsc, mds);
2554        if (!session) {
2555                session = register_session(mdsc, mds);
2556                if (IS_ERR(session)) {
2557                        err = PTR_ERR(session);
2558                        goto finish;
2559                }
2560        }
2561        req->r_session = get_session(session);
2562
2563        dout("do_request mds%d session %p state %s\n", mds, session,
2564             ceph_session_state_name(session->s_state));
2565        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2566            session->s_state != CEPH_MDS_SESSION_HUNG) {
2567                if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2568                        err = -EACCES;
2569                        goto out_session;
2570                }
2571                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2572                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2573                        __open_session(mdsc, session);
2574                list_add(&req->r_wait, &session->s_waiting);
2575                goto out_session;
2576        }
2577
2578        /* send request */
2579        req->r_resend_mds = -1;   /* forget any previous mds hint */
2580
2581        if (req->r_request_started == 0)   /* note request start time */
2582                req->r_request_started = jiffies;
2583
2584        err = __prepare_send_request(mdsc, req, mds, false);
2585        if (!err) {
2586                ceph_msg_get(req->r_request);
2587                ceph_con_send(&session->s_con, req->r_request);
2588        }
2589
2590out_session:
2591        ceph_put_mds_session(session);
2592finish:
2593        if (err) {
2594                dout("__do_request early error %d\n", err);
2595                req->r_err = err;
2596                complete_request(mdsc, req);
2597                __unregister_request(mdsc, req);
2598        }
2599        return;
2600}
2601
2602/*
2603 * called under mdsc->mutex
2604 */
2605static void __wake_requests(struct ceph_mds_client *mdsc,
2606                            struct list_head *head)
2607{
2608        struct ceph_mds_request *req;
2609        LIST_HEAD(tmp_list);
2610
2611        list_splice_init(head, &tmp_list);
2612
2613        while (!list_empty(&tmp_list)) {
2614                req = list_entry(tmp_list.next,
2615                                 struct ceph_mds_request, r_wait);
2616                list_del_init(&req->r_wait);
2617                dout(" wake request %p tid %llu\n", req, req->r_tid);
2618                __do_request(mdsc, req);
2619        }
2620}
2621
2622/*
2623 * Wake up threads with requests pending for @mds, so that they can
2624 * resubmit their requests to a possibly different mds.
2625 */
2626static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2627{
2628        struct ceph_mds_request *req;
2629        struct rb_node *p = rb_first(&mdsc->request_tree);
2630
2631        dout("kick_requests mds%d\n", mds);
2632        while (p) {
2633                req = rb_entry(p, struct ceph_mds_request, r_node);
2634                p = rb_next(p);
2635                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2636                        continue;
2637                if (req->r_attempts > 0)
2638                        continue; /* only new requests */
2639                if (req->r_session &&
2640                    req->r_session->s_mds == mds) {
2641                        dout(" kicking tid %llu\n", req->r_tid);
2642                        list_del_init(&req->r_wait);
2643                        __do_request(mdsc, req);
2644                }
2645        }
2646}
2647
2648int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2649                              struct ceph_mds_request *req)
2650{
2651        int err;
2652
2653        /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2654        if (req->r_inode)
2655                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2656        if (req->r_parent)
2657                ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2658        if (req->r_old_dentry_dir)
2659                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2660                                  CEPH_CAP_PIN);
2661
2662        dout("submit_request on %p for inode %p\n", req, dir);
2663        mutex_lock(&mdsc->mutex);
2664        __register_request(mdsc, req, dir);
2665        __do_request(mdsc, req);
2666        err = req->r_err;
2667        mutex_unlock(&mdsc->mutex);
2668        return err;
2669}
2670
2671static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2672                                  struct ceph_mds_request *req)
2673{
2674        int err;
2675
2676        /* wait */
2677        dout("do_request waiting\n");
2678        if (!req->r_timeout && req->r_wait_for_completion) {
2679                err = req->r_wait_for_completion(mdsc, req);
2680        } else {
2681                long timeleft = wait_for_completion_killable_timeout(
2682                                        &req->r_completion,
2683                                        ceph_timeout_jiffies(req->r_timeout));
2684                if (timeleft > 0)
2685                        err = 0;
2686                else if (!timeleft)
2687                        err = -EIO;  /* timed out */
2688                else
2689                        err = timeleft;  /* killed */
2690        }
2691        dout("do_request waited, got %d\n", err);
2692        mutex_lock(&mdsc->mutex);
2693
2694        /* only abort if we didn't race with a real reply */
2695        if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2696                err = le32_to_cpu(req->r_reply_info.head->result);
2697        } else if (err < 0) {
2698                dout("aborted request %lld with %d\n", req->r_tid, err);
2699
2700                /*
2701                 * ensure we aren't running concurrently with
2702                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2703                 * rely on locks (dir mutex) held by our caller.
2704                 */
2705                mutex_lock(&req->r_fill_mutex);
2706                req->r_err = err;
2707                set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2708                mutex_unlock(&req->r_fill_mutex);
2709
2710                if (req->r_parent &&
2711                    (req->r_op & CEPH_MDS_OP_WRITE))
2712                        ceph_invalidate_dir_request(req);
2713        } else {
2714                err = req->r_err;
2715        }
2716
2717        mutex_unlock(&mdsc->mutex);
2718        return err;
2719}
2720
2721/*
2722 * Synchrously perform an mds request.  Take care of all of the
2723 * session setup, forwarding, retry details.
2724 */
2725int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2726                         struct inode *dir,
2727                         struct ceph_mds_request *req)
2728{
2729        int err;
2730
2731        dout("do_request on %p\n", req);
2732
2733        /* issue */
2734        err = ceph_mdsc_submit_request(mdsc, dir, req);
2735        if (!err)
2736                err = ceph_mdsc_wait_request(mdsc, req);
2737        dout("do_request %p done, result %d\n", req, err);
2738        return err;
2739}
2740
2741/*
2742 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2743 * namespace request.
2744 */
2745void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2746{
2747        struct inode *dir = req->r_parent;
2748        struct inode *old_dir = req->r_old_dentry_dir;
2749
2750        dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2751
2752        ceph_dir_clear_complete(dir);
2753        if (old_dir)
2754                ceph_dir_clear_complete(old_dir);
2755        if (req->r_dentry)
2756                ceph_invalidate_dentry_lease(req->r_dentry);
2757        if (req->r_old_dentry)
2758                ceph_invalidate_dentry_lease(req->r_old_dentry);
2759}
2760
2761/*
2762 * Handle mds reply.
2763 *
2764 * We take the session mutex and parse and process the reply immediately.
2765 * This preserves the logical ordering of replies, capabilities, etc., sent
2766 * by the MDS as they are applied to our local cache.
2767 */
2768static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2769{
2770        struct ceph_mds_client *mdsc = session->s_mdsc;
2771        struct ceph_mds_request *req;
2772        struct ceph_mds_reply_head *head = msg->front.iov_base;
2773        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2774        struct ceph_snap_realm *realm;
2775        u64 tid;
2776        int err, result;
2777        int mds = session->s_mds;
2778
2779        if (msg->front.iov_len < sizeof(*head)) {
2780                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2781                ceph_msg_dump(msg);
2782                return;
2783        }
2784
2785        /* get request, session */
2786        tid = le64_to_cpu(msg->hdr.tid);
2787        mutex_lock(&mdsc->mutex);
2788        req = lookup_get_request(mdsc, tid);
2789        if (!req) {
2790                dout("handle_reply on unknown tid %llu\n", tid);
2791                mutex_unlock(&mdsc->mutex);
2792                return;
2793        }
2794        dout("handle_reply %p\n", req);
2795
2796        /* correct session? */
2797        if (req->r_session != session) {
2798                pr_err("mdsc_handle_reply got %llu on session mds%d"
2799                       " not mds%d\n", tid, session->s_mds,
2800                       req->r_session ? req->r_session->s_mds : -1);
2801                mutex_unlock(&mdsc->mutex);
2802                goto out;
2803        }
2804
2805        /* dup? */
2806        if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2807            (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2808                pr_warn("got a dup %s reply on %llu from mds%d\n",
2809                           head->safe ? "safe" : "unsafe", tid, mds);
2810                mutex_unlock(&mdsc->mutex);
2811                goto out;
2812        }
2813        if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2814                pr_warn("got unsafe after safe on %llu from mds%d\n",
2815                           tid, mds);
2816                mutex_unlock(&mdsc->mutex);
2817                goto out;
2818        }
2819
2820        result = le32_to_cpu(head->result);
2821
2822        /*
2823         * Handle an ESTALE
2824         * if we're not talking to the authority, send to them
2825         * if the authority has changed while we weren't looking,
2826         * send to new authority
2827         * Otherwise we just have to return an ESTALE
2828         */
2829        if (result == -ESTALE) {
2830                dout("got ESTALE on request %llu\n", req->r_tid);
2831                req->r_resend_mds = -1;
2832                if (req->r_direct_mode != USE_AUTH_MDS) {
2833                        dout("not using auth, setting for that now\n");
2834                        req->r_direct_mode = USE_AUTH_MDS;
2835                        __do_request(mdsc, req);
2836                        mutex_unlock(&mdsc->mutex);
2837                        goto out;
2838                } else  {
2839                        int mds = __choose_mds(mdsc, req);
2840                        if (mds >= 0 && mds != req->r_session->s_mds) {
2841                                dout("but auth changed, so resending\n");
2842                                __do_request(mdsc, req);
2843                                mutex_unlock(&mdsc->mutex);
2844                                goto out;
2845                        }
2846                }
2847                dout("have to return ESTALE on request %llu\n", req->r_tid);
2848        }
2849
2850
2851        if (head->safe) {
2852                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2853                __unregister_request(mdsc, req);
2854
2855                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2856                        /*
2857                         * We already handled the unsafe response, now do the
2858                         * cleanup.  No need to examine the response; the MDS
2859                         * doesn't include any result info in the safe
2860                         * response.  And even if it did, there is nothing
2861                         * useful we could do with a revised return value.
2862                         */
2863                        dout("got safe reply %llu, mds%d\n", tid, mds);
2864
2865                        /* last unsafe request during umount? */
2866                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2867                                complete_all(&mdsc->safe_umount_waiters);
2868                        mutex_unlock(&mdsc->mutex);
2869                        goto out;
2870                }
2871        } else {
2872                set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2873                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2874                if (req->r_unsafe_dir) {
2875                        struct ceph_inode_info *ci =
2876                                        ceph_inode(req->r_unsafe_dir);
2877                        spin_lock(&ci->i_unsafe_lock);
2878                        list_add_tail(&req->r_unsafe_dir_item,
2879                                      &ci->i_unsafe_dirops);
2880                        spin_unlock(&ci->i_unsafe_lock);
2881                }
2882        }
2883
2884        dout("handle_reply tid %lld result %d\n", tid, result);
2885        rinfo = &req->r_reply_info;
2886        if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
2887                err = parse_reply_info(msg, rinfo, (u64)-1);
2888        else
2889                err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2890        mutex_unlock(&mdsc->mutex);
2891
2892        mutex_lock(&session->s_mutex);
2893        if (err < 0) {
2894                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2895                ceph_msg_dump(msg);
2896                goto out_err;
2897        }
2898
2899        /* snap trace */
2900        realm = NULL;
2901        if (rinfo->snapblob_len) {
2902                down_write(&mdsc->snap_rwsem);
2903                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2904                                rinfo->snapblob + rinfo->snapblob_len,
2905                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2906                                &realm);
2907                downgrade_write(&mdsc->snap_rwsem);
2908        } else {
2909                down_read(&mdsc->snap_rwsem);
2910        }
2911
2912        /* insert trace into our cache */
2913        mutex_lock(&req->r_fill_mutex);
2914        current->journal_info = req;
2915        err = ceph_fill_trace(mdsc->fsc->sb, req);
2916        if (err == 0) {
2917                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2918                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2919                        ceph_readdir_prepopulate(req, req->r_session);
2920        }
2921        current->journal_info = NULL;
2922        mutex_unlock(&req->r_fill_mutex);
2923
2924        up_read(&mdsc->snap_rwsem);
2925        if (realm)
2926                ceph_put_snap_realm(mdsc, realm);
2927
2928        if (err == 0) {
2929                if (req->r_target_inode &&
2930                    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2931                        struct ceph_inode_info *ci =
2932                                ceph_inode(req->r_target_inode);
2933                        spin_lock(&ci->i_unsafe_lock);
2934                        list_add_tail(&req->r_unsafe_target_item,
2935                                      &ci->i_unsafe_iops);
2936                        spin_unlock(&ci->i_unsafe_lock);
2937                }
2938
2939                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2940        }
2941out_err:
2942        mutex_lock(&mdsc->mutex);
2943        if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2944                if (err) {
2945                        req->r_err = err;
2946                } else {
2947                        req->r_reply =  ceph_msg_get(msg);
2948                        set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2949                }
2950        } else {
2951                dout("reply arrived after request %lld was aborted\n", tid);
2952        }
2953        mutex_unlock(&mdsc->mutex);
2954
2955        mutex_unlock(&session->s_mutex);
2956
2957        /* kick calling process */
2958        complete_request(mdsc, req);
2959out:
2960        ceph_mdsc_put_request(req);
2961        return;
2962}
2963
2964
2965
2966/*
2967 * handle mds notification that our request has been forwarded.
2968 */
2969static void handle_forward(struct ceph_mds_client *mdsc,
2970                           struct ceph_mds_session *session,
2971                           struct ceph_msg *msg)
2972{
2973        struct ceph_mds_request *req;
2974        u64 tid = le64_to_cpu(msg->hdr.tid);
2975        u32 next_mds;
2976        u32 fwd_seq;
2977        int err = -EINVAL;
2978        void *p = msg->front.iov_base;
2979        void *end = p + msg->front.iov_len;
2980
2981        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2982        next_mds = ceph_decode_32(&p);
2983        fwd_seq = ceph_decode_32(&p);
2984
2985        mutex_lock(&mdsc->mutex);
2986        req = lookup_get_request(mdsc, tid);
2987        if (!req) {
2988                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2989                goto out;  /* dup reply? */
2990        }
2991
2992        if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2993                dout("forward tid %llu aborted, unregistering\n", tid);
2994                __unregister_request(mdsc, req);
2995        } else if (fwd_seq <= req->r_num_fwd) {
2996                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2997                     tid, next_mds, req->r_num_fwd, fwd_seq);
2998        } else {
2999                /* resend. forward race not possible; mds would drop */
3000                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3001                BUG_ON(req->r_err);
3002                BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3003                req->r_attempts = 0;
3004                req->r_num_fwd = fwd_seq;
3005                req->r_resend_mds = next_mds;
3006                put_request_session(req);
3007                __do_request(mdsc, req);
3008        }
3009        ceph_mdsc_put_request(req);
3010out:
3011        mutex_unlock(&mdsc->mutex);
3012        return;
3013
3014bad:
3015        pr_err("mdsc_handle_forward decode error err=%d\n", err);
3016}
3017
3018static int __decode_and_drop_session_metadata(void **p, void *end)
3019{
3020        /* map<string,string> */
3021        u32 n;
3022        ceph_decode_32_safe(p, end, n, bad);
3023        while (n-- > 0) {
3024                u32 len;
3025                ceph_decode_32_safe(p, end, len, bad);
3026                ceph_decode_need(p, end, len, bad);
3027                *p += len;
3028                ceph_decode_32_safe(p, end, len, bad);
3029                ceph_decode_need(p, end, len, bad);
3030                *p += len;
3031        }
3032        return 0;
3033bad:
3034        return -1;
3035}
3036
3037/*
3038 * handle a mds session control message
3039 */
3040static void handle_session(struct ceph_mds_session *session,
3041                           struct ceph_msg *msg)
3042{
3043        struct ceph_mds_client *mdsc = session->s_mdsc;
3044        int mds = session->s_mds;
3045        int msg_version = le16_to_cpu(msg->hdr.version);
3046        void *p = msg->front.iov_base;
3047        void *end = p + msg->front.iov_len;
3048        struct ceph_mds_session_head *h;
3049        u32 op;
3050        u64 seq;
3051        unsigned long features = 0;
3052        int wake = 0;
3053
3054        /* decode */
3055        ceph_decode_need(&p, end, sizeof(*h), bad);
3056        h = p;
3057        p += sizeof(*h);
3058
3059        op = le32_to_cpu(h->op);
3060        seq = le64_to_cpu(h->seq);
3061
3062        if (msg_version >= 3) {
3063                u32 len;
3064                /* version >= 2, metadata */
3065                if (__decode_and_drop_session_metadata(&p, end) < 0)
3066                        goto bad;
3067                /* version >= 3, feature bits */
3068                ceph_decode_32_safe(&p, end, len, bad);
3069                ceph_decode_need(&p, end, len, bad);
3070                memcpy(&features, p, min_t(size_t, len, sizeof(features)));
3071                p += len;
3072        }
3073
3074        mutex_lock(&mdsc->mutex);
3075        if (op == CEPH_SESSION_CLOSE) {
3076                get_session(session);
3077                __unregister_session(mdsc, session);
3078        }
3079        /* FIXME: this ttl calculation is generous */
3080        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3081        mutex_unlock(&mdsc->mutex);
3082
3083        mutex_lock(&session->s_mutex);
3084
3085        dout("handle_session mds%d %s %p state %s seq %llu\n",
3086             mds, ceph_session_op_name(op), session,
3087             ceph_session_state_name(session->s_state), seq);
3088
3089        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3090                session->s_state = CEPH_MDS_SESSION_OPEN;
3091                pr_info("mds%d came back\n", session->s_mds);
3092        }
3093
3094        switch (op) {
3095        case CEPH_SESSION_OPEN:
3096                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3097                        pr_info("mds%d reconnect success\n", session->s_mds);
3098                session->s_state = CEPH_MDS_SESSION_OPEN;
3099                session->s_features = features;
3100                renewed_caps(mdsc, session, 0);
3101                wake = 1;
3102                if (mdsc->stopping)
3103                        __close_session(mdsc, session);
3104                break;
3105
3106        case CEPH_SESSION_RENEWCAPS:
3107                if (session->s_renew_seq == seq)
3108                        renewed_caps(mdsc, session, 1);
3109                break;
3110
3111        case CEPH_SESSION_CLOSE:
3112                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3113                        pr_info("mds%d reconnect denied\n", session->s_mds);
3114                cleanup_session_requests(mdsc, session);
3115                remove_session_caps(session);
3116                wake = 2; /* for good measure */
3117                wake_up_all(&mdsc->session_close_wq);
3118                break;
3119
3120        case CEPH_SESSION_STALE:
3121                pr_info("mds%d caps went stale, renewing\n",
3122                        session->s_mds);
3123                spin_lock(&session->s_gen_ttl_lock);
3124                session->s_cap_gen++;
3125                session->s_cap_ttl = jiffies - 1;
3126                spin_unlock(&session->s_gen_ttl_lock);
3127                send_renew_caps(mdsc, session);
3128                break;
3129
3130        case CEPH_SESSION_RECALL_STATE:
3131                ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3132                break;
3133
3134        case CEPH_SESSION_FLUSHMSG:
3135                send_flushmsg_ack(mdsc, session, seq);
3136                break;
3137
3138        case CEPH_SESSION_FORCE_RO:
3139                dout("force_session_readonly %p\n", session);
3140                spin_lock(&session->s_cap_lock);
3141                session->s_readonly = true;
3142                spin_unlock(&session->s_cap_lock);
3143                wake_up_session_caps(session, FORCE_RO);
3144                break;
3145
3146        case CEPH_SESSION_REJECT:
3147                WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3148                pr_info("mds%d rejected session\n", session->s_mds);
3149                session->s_state = CEPH_MDS_SESSION_REJECTED;
3150                cleanup_session_requests(mdsc, session);
3151                remove_session_caps(session);
3152                wake = 2; /* for good measure */
3153                break;
3154
3155        default:
3156                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3157                WARN_ON(1);
3158        }
3159
3160        mutex_unlock(&session->s_mutex);
3161        if (wake) {
3162                mutex_lock(&mdsc->mutex);
3163                __wake_requests(mdsc, &session->s_waiting);
3164                if (wake == 2)
3165                        kick_requests(mdsc, mds);
3166                mutex_unlock(&mdsc->mutex);
3167        }
3168        if (op == CEPH_SESSION_CLOSE)
3169                ceph_put_mds_session(session);
3170        return;
3171
3172bad:
3173        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3174               (int)msg->front.iov_len);
3175        ceph_msg_dump(msg);
3176        return;
3177}
3178
3179
3180/*
3181 * called under session->mutex.
3182 */
3183static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3184                                   struct ceph_mds_session *session)
3185{
3186        struct ceph_mds_request *req, *nreq;
3187        struct rb_node *p;
3188        int err;
3189
3190        dout("replay_unsafe_requests mds%d\n", session->s_mds);
3191
3192        mutex_lock(&mdsc->mutex);
3193        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
3194                err = __prepare_send_request(mdsc, req, session->s_mds, true);
3195                if (!err) {
3196                        ceph_msg_get(req->r_request);
3197                        ceph_con_send(&session->s_con, req->r_request);
3198                }
3199        }
3200
3201        /*
3202         * also re-send old requests when MDS enters reconnect stage. So that MDS
3203         * can process completed request in clientreplay stage.
3204         */
3205        p = rb_first(&mdsc->request_tree);
3206        while (p) {
3207                req = rb_entry(p, struct ceph_mds_request, r_node);
3208                p = rb_next(p);
3209                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3210                        continue;
3211                if (req->r_attempts == 0)
3212                        continue; /* only old requests */
3213                if (req->r_session &&
3214                    req->r_session->s_mds == session->s_mds) {
3215                        err = __prepare_send_request(mdsc, req,
3216                                                     session->s_mds, true);
3217                        if (!err) {
3218                                ceph_msg_get(req->r_request);
3219                                ceph_con_send(&session->s_con, req->r_request);
3220                        }
3221                }
3222        }
3223        mutex_unlock(&mdsc->mutex);
3224}
3225
3226static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3227{
3228        struct ceph_msg *reply;
3229        struct ceph_pagelist *_pagelist;
3230        struct page *page;
3231        __le32 *addr;
3232        int err = -ENOMEM;
3233
3234        if (!recon_state->allow_multi)
3235                return -ENOSPC;
3236
3237        /* can't handle message that contains both caps and realm */
3238        BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3239
3240        /* pre-allocate new pagelist */
3241        _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3242        if (!_pagelist)
3243                return -ENOMEM;
3244
3245        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3246        if (!reply)
3247                goto fail_msg;
3248
3249        /* placeholder for nr_caps */
3250        err = ceph_pagelist_encode_32(_pagelist, 0);
3251        if (err < 0)
3252                goto fail;
3253
3254        if (recon_state->nr_caps) {
3255                /* currently encoding caps */
3256                err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3257                if (err)
3258                        goto fail;
3259        } else {
3260                /* placeholder for nr_realms (currently encoding relams) */
3261                err = ceph_pagelist_encode_32(_pagelist, 0);
3262                if (err < 0)
3263                        goto fail;
3264        }
3265
3266        err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3267        if (err)
3268                goto fail;
3269
3270        page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3271        addr = kmap_atomic(page);
3272        if (recon_state->nr_caps) {
3273                /* currently encoding caps */
3274                *addr = cpu_to_le32(recon_state->nr_caps);
3275        } else {
3276                /* currently encoding relams */
3277                *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3278        }
3279        kunmap_atomic(addr);
3280
3281        reply->hdr.version = cpu_to_le16(5);
3282        reply->hdr.compat_version = cpu_to_le16(4);
3283
3284        reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3285        ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3286
3287        ceph_con_send(&recon_state->session->s_con, reply);
3288        ceph_pagelist_release(recon_state->pagelist);
3289
3290        recon_state->pagelist = _pagelist;
3291        recon_state->nr_caps = 0;
3292        recon_state->nr_realms = 0;
3293        recon_state->msg_version = 5;
3294        return 0;
3295fail:
3296        ceph_msg_put(reply);
3297fail_msg:
3298        ceph_pagelist_release(_pagelist);
3299        return err;
3300}
3301
3302/*
3303 * Encode information about a cap for a reconnect with the MDS.
3304 */
3305static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3306                          void *arg)
3307{
3308        union {
3309                struct ceph_mds_cap_reconnect v2;
3310                struct ceph_mds_cap_reconnect_v1 v1;
3311        } rec;
3312        struct ceph_inode_info *ci = cap->ci;
3313        struct ceph_reconnect_state *recon_state = arg;
3314        struct ceph_pagelist *pagelist = recon_state->pagelist;
3315        int err;
3316        u64 snap_follows;
3317
3318        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3319             inode, ceph_vinop(inode), cap, cap->cap_id,
3320             ceph_cap_string(cap->issued));
3321
3322        spin_lock(&ci->i_ceph_lock);
3323        cap->seq = 0;        /* reset cap seq */
3324        cap->issue_seq = 0;  /* and issue_seq */
3325        cap->mseq = 0;       /* and migrate_seq */
3326        cap->cap_gen = cap->session->s_cap_gen;
3327
3328        if (recon_state->msg_version >= 2) {
3329                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3330                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3331                rec.v2.issued = cpu_to_le32(cap->issued);
3332                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3333                rec.v2.pathbase = 0;
3334                rec.v2.flock_len = (__force __le32)
3335                        ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3336        } else {
3337                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3338                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3339                rec.v1.issued = cpu_to_le32(cap->issued);
3340                rec.v1.size = cpu_to_le64(inode->i_size);
3341                ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3342                ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3343                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3344                rec.v1.pathbase = 0;
3345        }
3346
3347        if (list_empty(&ci->i_cap_snaps)) {
3348                snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3349        } else {
3350                struct ceph_cap_snap *capsnap =
3351                        list_first_entry(&ci->i_cap_snaps,
3352                                         struct ceph_cap_snap, ci_item);
3353                snap_follows = capsnap->follows;
3354        }
3355        spin_unlock(&ci->i_ceph_lock);
3356
3357        if (recon_state->msg_version >= 2) {
3358                int num_fcntl_locks, num_flock_locks;
3359                struct ceph_filelock *flocks = NULL;
3360                size_t struct_len, total_len = sizeof(u64);
3361                u8 struct_v = 0;
3362
3363encode_again:
3364                if (rec.v2.flock_len) {
3365                        ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3366                } else {
3367                        num_fcntl_locks = 0;
3368                        num_flock_locks = 0;
3369                }
3370                if (num_fcntl_locks + num_flock_locks > 0) {
3371                        flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3372                                               sizeof(struct ceph_filelock),
3373                                               GFP_NOFS);
3374                        if (!flocks) {
3375                                err = -ENOMEM;
3376                                goto out_err;
3377                        }
3378                        err = ceph_encode_locks_to_buffer(inode, flocks,
3379                                                          num_fcntl_locks,
3380                                                          num_flock_locks);
3381                        if (err) {
3382                                kfree(flocks);
3383                                flocks = NULL;
3384                                if (err == -ENOSPC)
3385                                        goto encode_again;
3386                                goto out_err;
3387                        }
3388                } else {
3389                        kfree(flocks);
3390                        flocks = NULL;
3391                }
3392
3393                if (recon_state->msg_version >= 3) {
3394                        /* version, compat_version and struct_len */
3395                        total_len += 2 * sizeof(u8) + sizeof(u32);
3396                        struct_v = 2;
3397                }
3398                /*
3399                 * number of encoded locks is stable, so copy to pagelist
3400                 */
3401                struct_len = 2 * sizeof(u32) +
3402                            (num_fcntl_locks + num_flock_locks) *
3403                            sizeof(struct ceph_filelock);
3404                rec.v2.flock_len = cpu_to_le32(struct_len);
3405
3406                struct_len += sizeof(u32) + sizeof(rec.v2);
3407
3408                if (struct_v >= 2)
3409                        struct_len += sizeof(u64); /* snap_follows */
3410
3411                total_len += struct_len;
3412
3413                if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3414                        err = send_reconnect_partial(recon_state);
3415                        if (err)
3416                                goto out_freeflocks;
3417                        pagelist = recon_state->pagelist;
3418                }
3419
3420                err = ceph_pagelist_reserve(pagelist, total_len);
3421                if (err)
3422                        goto out_freeflocks;
3423
3424                ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3425                if (recon_state->msg_version >= 3) {
3426                        ceph_pagelist_encode_8(pagelist, struct_v);
3427                        ceph_pagelist_encode_8(pagelist, 1);
3428                        ceph_pagelist_encode_32(pagelist, struct_len);
3429                }
3430                ceph_pagelist_encode_string(pagelist, NULL, 0);
3431                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3432                ceph_locks_to_pagelist(flocks, pagelist,
3433                                       num_fcntl_locks, num_flock_locks);
3434                if (struct_v >= 2)
3435                        ceph_pagelist_encode_64(pagelist, snap_follows);
3436out_freeflocks:
3437                kfree(flocks);
3438        } else {
3439                u64 pathbase = 0;
3440                int pathlen = 0;
3441                char *path = NULL;
3442                struct dentry *dentry;
3443
3444                dentry = d_find_alias(inode);
3445                if (dentry) {
3446                        path = ceph_mdsc_build_path(dentry,
3447                                                &pathlen, &pathbase, 0);
3448                        dput(dentry);
3449                        if (IS_ERR(path)) {
3450                                err = PTR_ERR(path);
3451                                goto out_err;
3452                        }
3453                        rec.v1.pathbase = cpu_to_le64(pathbase);
3454                }
3455
3456                err = ceph_pagelist_reserve(pagelist,
3457                                            sizeof(u64) + sizeof(u32) +
3458                                            pathlen + sizeof(rec.v1));
3459                if (err) {
3460                        goto out_freepath;
3461                }
3462
3463                ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3464                ceph_pagelist_encode_string(pagelist, path, pathlen);
3465                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3466out_freepath:
3467                ceph_mdsc_free_path(path, pathlen);
3468        }
3469
3470out_err:
3471        if (err >= 0)
3472                recon_state->nr_caps++;
3473        return err;
3474}
3475
3476static int encode_snap_realms(struct ceph_mds_client *mdsc,
3477                              struct ceph_reconnect_state *recon_state)
3478{
3479        struct rb_node *p;
3480        struct ceph_pagelist *pagelist = recon_state->pagelist;
3481        int err = 0;
3482
3483        if (recon_state->msg_version >= 4) {
3484                err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3485                if (err < 0)
3486                        goto fail;
3487        }
3488
3489        /*
3490         * snaprealms.  we provide mds with the ino, seq (version), and
3491         * parent for all of our realms.  If the mds has any newer info,
3492         * it will tell us.
3493         */
3494        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3495                struct ceph_snap_realm *realm =
3496                       rb_entry(p, struct ceph_snap_realm, node);
3497                struct ceph_mds_snaprealm_reconnect sr_rec;
3498
3499                if (recon_state->msg_version >= 4) {
3500                        size_t need = sizeof(u8) * 2 + sizeof(u32) +
3501                                      sizeof(sr_rec);
3502
3503                        if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3504                                err = send_reconnect_partial(recon_state);
3505                                if (err)
3506                                        goto fail;
3507                                pagelist = recon_state->pagelist;
3508                        }
3509
3510                        err = ceph_pagelist_reserve(pagelist, need);
3511                        if (err)
3512                                goto fail;
3513
3514                        ceph_pagelist_encode_8(pagelist, 1);
3515                        ceph_pagelist_encode_8(pagelist, 1);
3516                        ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3517                }
3518
3519                dout(" adding snap realm %llx seq %lld parent %llx\n",
3520                     realm->ino, realm->seq, realm->parent_ino);
3521                sr_rec.ino = cpu_to_le64(realm->ino);
3522                sr_rec.seq = cpu_to_le64(realm->seq);
3523                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3524
3525                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3526                if (err)
3527                        goto fail;
3528
3529                recon_state->nr_realms++;
3530        }
3531fail:
3532        return err;
3533}
3534
3535
3536/*
3537 * If an MDS fails and recovers, clients need to reconnect in order to
3538 * reestablish shared state.  This includes all caps issued through
3539 * this session _and_ the snap_realm hierarchy.  Because it's not
3540 * clear which snap realms the mds cares about, we send everything we
3541 * know about.. that ensures we'll then get any new info the
3542 * recovering MDS might have.
3543 *
3544 * This is a relatively heavyweight operation, but it's rare.
3545 *
3546 * called with mdsc->mutex held.
3547 */
3548static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3549                               struct ceph_mds_session *session)
3550{
3551        struct ceph_msg *reply;
3552        int mds = session->s_mds;
3553        int err = -ENOMEM;
3554        struct ceph_reconnect_state recon_state = {
3555                .session = session,
3556        };
3557        LIST_HEAD(dispose);
3558
3559        pr_info("mds%d reconnect start\n", mds);
3560
3561        recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3562        if (!recon_state.pagelist)
3563                goto fail_nopagelist;
3564
3565        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3566        if (!reply)
3567                goto fail_nomsg;
3568
3569        mutex_lock(&session->s_mutex);
3570        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3571        session->s_seq = 0;
3572
3573        dout("session %p state %s\n", session,
3574             ceph_session_state_name(session->s_state));
3575
3576        spin_lock(&session->s_gen_ttl_lock);
3577        session->s_cap_gen++;
3578        spin_unlock(&session->s_gen_ttl_lock);
3579
3580        spin_lock(&session->s_cap_lock);
3581        /* don't know if session is readonly */
3582        session->s_readonly = 0;
3583        /*
3584         * notify __ceph_remove_cap() that we are composing cap reconnect.
3585         * If a cap get released before being added to the cap reconnect,
3586         * __ceph_remove_cap() should skip queuing cap release.
3587         */
3588        session->s_cap_reconnect = 1;
3589        /* drop old cap expires; we're about to reestablish that state */
3590        detach_cap_releases(session, &dispose);
3591        spin_unlock(&session->s_cap_lock);
3592        dispose_cap_releases(mdsc, &dispose);
3593
3594        /* trim unused caps to reduce MDS's cache rejoin time */
3595        if (mdsc->fsc->sb->s_root)
3596                shrink_dcache_parent(mdsc->fsc->sb->s_root);
3597
3598        ceph_con_close(&session->s_con);
3599        ceph_con_open(&session->s_con,
3600                      CEPH_ENTITY_TYPE_MDS, mds,
3601                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3602
3603        /* replay unsafe requests */
3604        replay_unsafe_requests(mdsc, session);
3605
3606        ceph_early_kick_flushing_caps(mdsc, session);
3607
3608        down_read(&mdsc->snap_rwsem);
3609
3610        /* placeholder for nr_caps */
3611        err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3612        if (err)
3613                goto fail;
3614
3615        if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3616                recon_state.msg_version = 3;
3617                recon_state.allow_multi = true;
3618        } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3619                recon_state.msg_version = 3;
3620        } else {
3621                recon_state.msg_version = 2;
3622        }
3623        /* trsaverse this session's caps */
3624        err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
3625
3626        spin_lock(&session->s_cap_lock);
3627        session->s_cap_reconnect = 0;
3628        spin_unlock(&session->s_cap_lock);
3629
3630        if (err < 0)
3631                goto fail;
3632
3633        /* check if all realms can be encoded into current message */
3634        if (mdsc->num_snap_realms) {
3635                size_t total_len =
3636                        recon_state.pagelist->length +
3637                        mdsc->num_snap_realms *
3638                        sizeof(struct ceph_mds_snaprealm_reconnect);
3639                if (recon_state.msg_version >= 4) {
3640                        /* number of realms */
3641                        total_len += sizeof(u32);
3642                        /* version, compat_version and struct_len */
3643                        total_len += mdsc->num_snap_realms *
3644                                     (2 * sizeof(u8) + sizeof(u32));
3645                }
3646                if (total_len > RECONNECT_MAX_SIZE) {
3647                        if (!recon_state.allow_multi) {
3648                                err = -ENOSPC;
3649                                goto fail;
3650                        }
3651                        if (recon_state.nr_caps) {
3652                                err = send_reconnect_partial(&recon_state);
3653                                if (err)
3654                                        goto fail;
3655                        }
3656                        recon_state.msg_version = 5;
3657                }
3658        }
3659
3660        err = encode_snap_realms(mdsc, &recon_state);
3661        if (err < 0)
3662                goto fail;
3663
3664        if (recon_state.msg_version >= 5) {
3665                err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3666                if (err < 0)
3667                        goto fail;
3668        }
3669
3670        if (recon_state.nr_caps || recon_state.nr_realms) {
3671                struct page *page =
3672                        list_first_entry(&recon_state.pagelist->head,
3673                                        struct page, lru);
3674                __le32 *addr = kmap_atomic(page);
3675                if (recon_state.nr_caps) {
3676                        WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3677                        *addr = cpu_to_le32(recon_state.nr_caps);
3678                } else if (recon_state.msg_version >= 4) {
3679                        *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3680                }
3681                kunmap_atomic(addr);
3682        }
3683
3684        reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3685        if (recon_state.msg_version >= 4)
3686                reply->hdr.compat_version = cpu_to_le16(4);
3687
3688        reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3689        ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3690
3691        ceph_con_send(&session->s_con, reply);
3692
3693        mutex_unlock(&session->s_mutex);
3694
3695        mutex_lock(&mdsc->mutex);
3696        __wake_requests(mdsc, &session->s_waiting);
3697        mutex_unlock(&mdsc->mutex);
3698
3699        up_read(&mdsc->snap_rwsem);
3700        ceph_pagelist_release(recon_state.pagelist);
3701        return;
3702
3703fail:
3704        ceph_msg_put(reply);
3705        up_read(&mdsc->snap_rwsem);
3706        mutex_unlock(&session->s_mutex);
3707fail_nomsg:
3708        ceph_pagelist_release(recon_state.pagelist);
3709fail_nopagelist:
3710        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3711        return;
3712}
3713
3714
3715/*
3716 * compare old and new mdsmaps, kicking requests
3717 * and closing out old connections as necessary
3718 *
3719 * called under mdsc->mutex.
3720 */
3721static void check_new_map(struct ceph_mds_client *mdsc,
3722                          struct ceph_mdsmap *newmap,
3723                          struct ceph_mdsmap *oldmap)
3724{
3725        int i;
3726        int oldstate, newstate;
3727        struct ceph_mds_session *s;
3728
3729        dout("check_new_map new %u old %u\n",
3730             newmap->m_epoch, oldmap->m_epoch);
3731
3732        for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3733                if (!mdsc->sessions[i])
3734                        continue;
3735                s = mdsc->sessions[i];
3736                oldstate = ceph_mdsmap_get_state(oldmap, i);
3737                newstate = ceph_mdsmap_get_state(newmap, i);
3738
3739                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3740                     i, ceph_mds_state_name(oldstate),
3741                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3742                     ceph_mds_state_name(newstate),
3743                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3744                     ceph_session_state_name(s->s_state));
3745
3746                if (i >= newmap->m_num_mds) {
3747                        /* force close session for stopped mds */
3748                        get_session(s);
3749                        __unregister_session(mdsc, s);
3750                        __wake_requests(mdsc, &s->s_waiting);
3751                        mutex_unlock(&mdsc->mutex);
3752
3753                        mutex_lock(&s->s_mutex);
3754                        cleanup_session_requests(mdsc, s);
3755                        remove_session_caps(s);
3756                        mutex_unlock(&s->s_mutex);
3757
3758                        ceph_put_mds_session(s);
3759
3760                        mutex_lock(&mdsc->mutex);
3761                        kick_requests(mdsc, i);
3762                        continue;
3763                }
3764
3765                if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3766                           ceph_mdsmap_get_addr(newmap, i),
3767                           sizeof(struct ceph_entity_addr))) {
3768                        /* just close it */
3769                        mutex_unlock(&mdsc->mutex);
3770                        mutex_lock(&s->s_mutex);
3771                        mutex_lock(&mdsc->mutex);
3772                        ceph_con_close(&s->s_con);
3773                        mutex_unlock(&s->s_mutex);
3774                        s->s_state = CEPH_MDS_SESSION_RESTARTING;
3775                } else if (oldstate == newstate) {
3776                        continue;  /* nothing new with this mds */
3777                }
3778
3779                /*
3780                 * send reconnect?
3781                 */
3782                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3783                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3784                        mutex_unlock(&mdsc->mutex);
3785                        send_mds_reconnect(mdsc, s);
3786                        mutex_lock(&mdsc->mutex);
3787                }
3788
3789                /*
3790                 * kick request on any mds that has gone active.
3791                 */
3792                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3793                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3794                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3795                            oldstate != CEPH_MDS_STATE_STARTING)
3796                                pr_info("mds%d recovery completed\n", s->s_mds);
3797                        kick_requests(mdsc, i);
3798                        ceph_kick_flushing_caps(mdsc, s);
3799                        wake_up_session_caps(s, RECONNECT);
3800                }
3801        }
3802
3803        for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3804                s = mdsc->sessions[i];
3805                if (!s)
3806                        continue;
3807                if (!ceph_mdsmap_is_laggy(newmap, i))
3808                        continue;
3809                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3810                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3811                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3812                        dout(" connecting to export targets of laggy mds%d\n",
3813                             i);
3814                        __open_export_target_sessions(mdsc, s);
3815                }
3816        }
3817}
3818
3819
3820
3821/*
3822 * leases
3823 */
3824
3825/*
3826 * caller must hold session s_mutex, dentry->d_lock
3827 */
3828void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3829{
3830        struct ceph_dentry_info *di = ceph_dentry(dentry);
3831
3832        ceph_put_mds_session(di->lease_session);
3833        di->lease_session = NULL;
3834}
3835
3836static void handle_lease(struct ceph_mds_client *mdsc,
3837                         struct ceph_mds_session *session,
3838                         struct ceph_msg *msg)
3839{
3840        struct super_block *sb = mdsc->fsc->sb;
3841        struct inode *inode;
3842        struct dentry *parent, *dentry;
3843        struct ceph_dentry_info *di;
3844        int mds = session->s_mds;
3845        struct ceph_mds_lease *h = msg->front.iov_base;
3846        u32 seq;
3847        struct ceph_vino vino;
3848        struct qstr dname;
3849        int release = 0;
3850
3851        dout("handle_lease from mds%d\n", mds);
3852
3853        /* decode */
3854        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3855                goto bad;
3856        vino.ino = le64_to_cpu(h->ino);
3857        vino.snap = CEPH_NOSNAP;
3858        seq = le32_to_cpu(h->seq);
3859        dname.len = get_unaligned_le32(h + 1);
3860        if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
3861                goto bad;
3862        dname.name = (void *)(h + 1) + sizeof(u32);
3863
3864        /* lookup inode */
3865        inode = ceph_find_inode(sb, vino);
3866        dout("handle_lease %s, ino %llx %p %.*s\n",
3867             ceph_lease_op_name(h->action), vino.ino, inode,
3868             dname.len, dname.name);
3869
3870        mutex_lock(&session->s_mutex);
3871        session->s_seq++;
3872
3873        if (!inode) {
3874                dout("handle_lease no inode %llx\n", vino.ino);
3875                goto release;
3876        }
3877
3878        /* dentry */
3879        parent = d_find_alias(inode);
3880        if (!parent) {
3881                dout("no parent dentry on inode %p\n", inode);
3882                WARN_ON(1);
3883                goto release;  /* hrm... */
3884        }
3885        dname.hash = full_name_hash(parent, dname.name, dname.len);
3886        dentry = d_lookup(parent, &dname);
3887        dput(parent);
3888        if (!dentry)
3889                goto release;
3890
3891        spin_lock(&dentry->d_lock);
3892        di = ceph_dentry(dentry);
3893        switch (h->action) {
3894        case CEPH_MDS_LEASE_REVOKE:
3895                if (di->lease_session == session) {
3896                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3897                                h->seq = cpu_to_le32(di->lease_seq);
3898                        __ceph_mdsc_drop_dentry_lease(dentry);
3899                }
3900                release = 1;
3901                break;
3902
3903        case CEPH_MDS_LEASE_RENEW:
3904                if (di->lease_session == session &&
3905                    di->lease_gen == session->s_cap_gen &&
3906                    di->lease_renew_from &&
3907                    di->lease_renew_after == 0) {
3908                        unsigned long duration =
3909                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3910
3911                        di->lease_seq = seq;
3912                        di->time = di->lease_renew_from + duration;
3913                        di->lease_renew_after = di->lease_renew_from +
3914                                (duration >> 1);
3915                        di->lease_renew_from = 0;
3916                }
3917                break;
3918        }
3919        spin_unlock(&dentry->d_lock);
3920        dput(dentry);
3921
3922        if (!release)
3923                goto out;
3924
3925release:
3926        /* let's just reuse the same message */
3927        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3928        ceph_msg_get(msg);
3929        ceph_con_send(&session->s_con, msg);
3930
3931out:
3932        mutex_unlock(&session->s_mutex);
3933        /* avoid calling iput_final() in mds dispatch threads */
3934        ceph_async_iput(inode);
3935        return;
3936
3937bad:
3938        pr_err("corrupt lease message\n");
3939        ceph_msg_dump(msg);
3940}
3941
3942void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3943                              struct dentry *dentry, char action,
3944                              u32 seq)
3945{
3946        struct ceph_msg *msg;
3947        struct ceph_mds_lease *lease;
3948        struct inode *dir;
3949        int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
3950
3951        dout("lease_send_msg identry %p %s to mds%d\n",
3952             dentry, ceph_lease_op_name(action), session->s_mds);
3953
3954        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3955        if (!msg)
3956                return;
3957        lease = msg->front.iov_base;
3958        lease->action = action;
3959        lease->seq = cpu_to_le32(seq);
3960
3961        spin_lock(&dentry->d_lock);
3962        dir = d_inode(dentry->d_parent);
3963        lease->ino = cpu_to_le64(ceph_ino(dir));
3964        lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
3965
3966        put_unaligned_le32(dentry->d_name.len, lease + 1);
3967        memcpy((void *)(lease + 1) + 4,
3968               dentry->d_name.name, dentry->d_name.len);
3969        spin_unlock(&dentry->d_lock);
3970        /*
3971         * if this is a preemptive lease RELEASE, no need to
3972         * flush request stream, since the actual request will
3973         * soon follow.
3974         */
3975        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3976
3977        ceph_con_send(&session->s_con, msg);
3978}
3979
3980/*
3981 * lock unlock sessions, to wait ongoing session activities
3982 */
3983static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
3984{
3985        int i;
3986
3987        mutex_lock(&mdsc->mutex);
3988        for (i = 0; i < mdsc->max_sessions; i++) {
3989                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3990                if (!s)
3991                        continue;
3992                mutex_unlock(&mdsc->mutex);
3993                mutex_lock(&s->s_mutex);
3994                mutex_unlock(&s->s_mutex);
3995                ceph_put_mds_session(s);
3996                mutex_lock(&mdsc->mutex);
3997        }
3998        mutex_unlock(&mdsc->mutex);
3999}
4000
4001
4002
4003/*
4004 * delayed work -- periodically trim expired leases, renew caps with mds
4005 */
4006static void schedule_delayed(struct ceph_mds_client *mdsc)
4007{
4008        int delay = 5;
4009        unsigned hz = round_jiffies_relative(HZ * delay);
4010        schedule_delayed_work(&mdsc->delayed_work, hz);
4011}
4012
4013static void delayed_work(struct work_struct *work)
4014{
4015        int i;
4016        struct ceph_mds_client *mdsc =
4017                container_of(work, struct ceph_mds_client, delayed_work.work);
4018        int renew_interval;
4019        int renew_caps;
4020
4021        dout("mdsc delayed_work\n");
4022
4023        mutex_lock(&mdsc->mutex);
4024        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4025        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4026                                   mdsc->last_renew_caps);
4027        if (renew_caps)
4028                mdsc->last_renew_caps = jiffies;
4029
4030        for (i = 0; i < mdsc->max_sessions; i++) {
4031                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4032                if (!s)
4033                        continue;
4034                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4035                        dout("resending session close request for mds%d\n",
4036                             s->s_mds);
4037                        request_close_session(mdsc, s);
4038                        ceph_put_mds_session(s);
4039                        continue;
4040                }
4041                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4042                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4043                                s->s_state = CEPH_MDS_SESSION_HUNG;
4044                                pr_info("mds%d hung\n", s->s_mds);
4045                        }
4046                }
4047                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
4048                        /* this mds is failed or recovering, just wait */
4049                        ceph_put_mds_session(s);
4050                        continue;
4051                }
4052                mutex_unlock(&mdsc->mutex);
4053
4054                mutex_lock(&s->s_mutex);
4055                if (renew_caps)
4056                        send_renew_caps(mdsc, s);
4057                else
4058                        ceph_con_keepalive(&s->s_con);
4059                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4060                    s->s_state == CEPH_MDS_SESSION_HUNG)
4061                        ceph_send_cap_releases(mdsc, s);
4062                mutex_unlock(&s->s_mutex);
4063                ceph_put_mds_session(s);
4064
4065                mutex_lock(&mdsc->mutex);
4066        }
4067        mutex_unlock(&mdsc->mutex);
4068
4069        ceph_check_delayed_caps(mdsc);
4070
4071        ceph_queue_cap_reclaim_work(mdsc);
4072
4073        ceph_trim_snapid_map(mdsc);
4074
4075        schedule_delayed(mdsc);
4076}
4077
4078int ceph_mdsc_init(struct ceph_fs_client *fsc)
4079
4080{
4081        struct ceph_mds_client *mdsc;
4082
4083        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4084        if (!mdsc)
4085                return -ENOMEM;
4086        mdsc->fsc = fsc;
4087        mutex_init(&mdsc->mutex);
4088        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4089        if (!mdsc->mdsmap) {
4090                kfree(mdsc);
4091                return -ENOMEM;
4092        }
4093
4094        fsc->mdsc = mdsc;
4095        init_completion(&mdsc->safe_umount_waiters);
4096        init_waitqueue_head(&mdsc->session_close_wq);
4097        INIT_LIST_HEAD(&mdsc->waiting_for_map);
4098        mdsc->sessions = NULL;
4099        atomic_set(&mdsc->num_sessions, 0);
4100        mdsc->max_sessions = 0;
4101        mdsc->stopping = 0;
4102        atomic64_set(&mdsc->quotarealms_count, 0);
4103        mdsc->quotarealms_inodes = RB_ROOT;
4104        mutex_init(&mdsc->quotarealms_inodes_mutex);
4105        mdsc->last_snap_seq = 0;
4106        init_rwsem(&mdsc->snap_rwsem);
4107        mdsc->snap_realms = RB_ROOT;
4108        INIT_LIST_HEAD(&mdsc->snap_empty);
4109        mdsc->num_snap_realms = 0;
4110        spin_lock_init(&mdsc->snap_empty_lock);
4111        mdsc->last_tid = 0;
4112        mdsc->oldest_tid = 0;
4113        mdsc->request_tree = RB_ROOT;
4114        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4115        mdsc->last_renew_caps = jiffies;
4116        INIT_LIST_HEAD(&mdsc->cap_delay_list);
4117        spin_lock_init(&mdsc->cap_delay_lock);
4118        INIT_LIST_HEAD(&mdsc->snap_flush_list);
4119        spin_lock_init(&mdsc->snap_flush_lock);
4120        mdsc->last_cap_flush_tid = 1;
4121        INIT_LIST_HEAD(&mdsc->cap_flush_list);
4122        INIT_LIST_HEAD(&mdsc->cap_dirty);
4123        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4124        mdsc->num_cap_flushing = 0;
4125        spin_lock_init(&mdsc->cap_dirty_lock);
4126        init_waitqueue_head(&mdsc->cap_flushing_wq);
4127        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4128        atomic_set(&mdsc->cap_reclaim_pending, 0);
4129
4130        spin_lock_init(&mdsc->dentry_list_lock);
4131        INIT_LIST_HEAD(&mdsc->dentry_leases);
4132        INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4133
4134        ceph_caps_init(mdsc);
4135        ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4136
4137        spin_lock_init(&mdsc->snapid_map_lock);
4138        mdsc->snapid_map_tree = RB_ROOT;
4139        INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4140
4141        init_rwsem(&mdsc->pool_perm_rwsem);
4142        mdsc->pool_perm_tree = RB_ROOT;
4143
4144        strscpy(mdsc->nodename, utsname()->nodename,
4145                sizeof(mdsc->nodename));
4146        return 0;
4147}
4148
4149/*
4150 * Wait for safe replies on open mds requests.  If we time out, drop
4151 * all requests from the tree to avoid dangling dentry refs.
4152 */
4153static void wait_requests(struct ceph_mds_client *mdsc)
4154{
4155        struct ceph_options *opts = mdsc->fsc->client->options;
4156        struct ceph_mds_request *req;
4157
4158        mutex_lock(&mdsc->mutex);
4159        if (__get_oldest_req(mdsc)) {
4160                mutex_unlock(&mdsc->mutex);
4161
4162                dout("wait_requests waiting for requests\n");
4163                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4164                                    ceph_timeout_jiffies(opts->mount_timeout));
4165
4166                /* tear down remaining requests */
4167                mutex_lock(&mdsc->mutex);
4168                while ((req = __get_oldest_req(mdsc))) {
4169                        dout("wait_requests timed out on tid %llu\n",
4170                             req->r_tid);
4171                        list_del_init(&req->r_wait);
4172                        __unregister_request(mdsc, req);
4173                }
4174        }
4175        mutex_unlock(&mdsc->mutex);
4176        dout("wait_requests done\n");
4177}
4178
4179/*
4180 * called before mount is ro, and before dentries are torn down.
4181 * (hmm, does this still race with new lookups?)
4182 */
4183void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4184{
4185        dout("pre_umount\n");
4186        mdsc->stopping = 1;
4187
4188        lock_unlock_sessions(mdsc);
4189        ceph_flush_dirty_caps(mdsc);
4190        wait_requests(mdsc);
4191
4192        /*
4193         * wait for reply handlers to drop their request refs and
4194         * their inode/dcache refs
4195         */
4196        ceph_msgr_flush();
4197
4198        ceph_cleanup_quotarealms_inodes(mdsc);
4199}
4200
4201/*
4202 * wait for all write mds requests to flush.
4203 */
4204static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4205{
4206        struct ceph_mds_request *req = NULL, *nextreq;
4207        struct rb_node *n;
4208
4209        mutex_lock(&mdsc->mutex);
4210        dout("wait_unsafe_requests want %lld\n", want_tid);
4211restart:
4212        req = __get_oldest_req(mdsc);
4213        while (req && req->r_tid <= want_tid) {
4214                /* find next request */
4215                n = rb_next(&req->r_node);
4216                if (n)
4217                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4218                else
4219                        nextreq = NULL;
4220                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4221                    (req->r_op & CEPH_MDS_OP_WRITE)) {
4222                        /* write op */
4223                        ceph_mdsc_get_request(req);
4224                        if (nextreq)
4225                                ceph_mdsc_get_request(nextreq);
4226                        mutex_unlock(&mdsc->mutex);
4227                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4228                             req->r_tid, want_tid);
4229                        wait_for_completion(&req->r_safe_completion);
4230                        mutex_lock(&mdsc->mutex);
4231                        ceph_mdsc_put_request(req);
4232                        if (!nextreq)
4233                                break;  /* next dne before, so we're done! */
4234                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
4235                                /* next request was removed from tree */
4236                                ceph_mdsc_put_request(nextreq);
4237                                goto restart;
4238                        }
4239                        ceph_mdsc_put_request(nextreq);  /* won't go away */
4240                }
4241                req = nextreq;
4242        }
4243        mutex_unlock(&mdsc->mutex);
4244        dout("wait_unsafe_requests done\n");
4245}
4246
4247void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4248{
4249        u64 want_tid, want_flush;
4250
4251        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4252                return;
4253
4254        dout("sync\n");
4255        mutex_lock(&mdsc->mutex);
4256        want_tid = mdsc->last_tid;
4257        mutex_unlock(&mdsc->mutex);
4258
4259        ceph_flush_dirty_caps(mdsc);
4260        spin_lock(&mdsc->cap_dirty_lock);
4261        want_flush = mdsc->last_cap_flush_tid;
4262        if (!list_empty(&mdsc->cap_flush_list)) {
4263                struct ceph_cap_flush *cf =
4264                        list_last_entry(&mdsc->cap_flush_list,
4265                                        struct ceph_cap_flush, g_list);
4266                cf->wake = true;
4267        }
4268        spin_unlock(&mdsc->cap_dirty_lock);
4269
4270        dout("sync want tid %lld flush_seq %lld\n",
4271             want_tid, want_flush);
4272
4273        wait_unsafe_requests(mdsc, want_tid);
4274        wait_caps_flush(mdsc, want_flush);
4275}
4276
4277/*
4278 * true if all sessions are closed, or we force unmount
4279 */
4280static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4281{
4282        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4283                return true;
4284        return atomic_read(&mdsc->num_sessions) <= skipped;
4285}
4286
4287/*
4288 * called after sb is ro.
4289 */
4290void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4291{
4292        struct ceph_options *opts = mdsc->fsc->client->options;
4293        struct ceph_mds_session *session;
4294        int i;
4295        int skipped = 0;
4296
4297        dout("close_sessions\n");
4298
4299        /* close sessions */
4300        mutex_lock(&mdsc->mutex);
4301        for (i = 0; i < mdsc->max_sessions; i++) {
4302                session = __ceph_lookup_mds_session(mdsc, i);
4303                if (!session)
4304                        continue;
4305                mutex_unlock(&mdsc->mutex);
4306                mutex_lock(&session->s_mutex);
4307                if (__close_session(mdsc, session) <= 0)
4308                        skipped++;
4309                mutex_unlock(&session->s_mutex);
4310                ceph_put_mds_session(session);
4311                mutex_lock(&mdsc->mutex);
4312        }
4313        mutex_unlock(&mdsc->mutex);
4314
4315        dout("waiting for sessions to close\n");
4316        wait_event_timeout(mdsc->session_close_wq,
4317                           done_closing_sessions(mdsc, skipped),
4318                           ceph_timeout_jiffies(opts->mount_timeout));
4319
4320        /* tear down remaining sessions */
4321        mutex_lock(&mdsc->mutex);
4322        for (i = 0; i < mdsc->max_sessions; i++) {
4323                if (mdsc->sessions[i]) {
4324                        session = get_session(mdsc->sessions[i]);
4325                        __unregister_session(mdsc, session);
4326                        mutex_unlock(&mdsc->mutex);
4327                        mutex_lock(&session->s_mutex);
4328                        remove_session_caps(session);
4329                        mutex_unlock(&session->s_mutex);
4330                        ceph_put_mds_session(session);
4331                        mutex_lock(&mdsc->mutex);
4332                }
4333        }
4334        WARN_ON(!list_empty(&mdsc->cap_delay_list));
4335        mutex_unlock(&mdsc->mutex);
4336
4337        ceph_cleanup_snapid_map(mdsc);
4338        ceph_cleanup_empty_realms(mdsc);
4339
4340        cancel_work_sync(&mdsc->cap_reclaim_work);
4341        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4342
4343        dout("stopped\n");
4344}
4345
4346void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4347{
4348        struct ceph_mds_session *session;
4349        int mds;
4350
4351        dout("force umount\n");
4352
4353        mutex_lock(&mdsc->mutex);
4354        for (mds = 0; mds < mdsc->max_sessions; mds++) {
4355                session = __ceph_lookup_mds_session(mdsc, mds);
4356                if (!session)
4357                        continue;
4358                mutex_unlock(&mdsc->mutex);
4359                mutex_lock(&session->s_mutex);
4360                __close_session(mdsc, session);
4361                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4362                        cleanup_session_requests(mdsc, session);
4363                        remove_session_caps(session);
4364                }
4365                mutex_unlock(&session->s_mutex);
4366                ceph_put_mds_session(session);
4367                mutex_lock(&mdsc->mutex);
4368                kick_requests(mdsc, mds);
4369        }
4370        __wake_requests(mdsc, &mdsc->waiting_for_map);
4371        mutex_unlock(&mdsc->mutex);
4372}
4373
4374static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4375{
4376        dout("stop\n");
4377        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4378        if (mdsc->mdsmap)
4379                ceph_mdsmap_destroy(mdsc->mdsmap);
4380        kfree(mdsc->sessions);
4381        ceph_caps_finalize(mdsc);
4382        ceph_pool_perm_destroy(mdsc);
4383}
4384
4385void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4386{
4387        struct ceph_mds_client *mdsc = fsc->mdsc;
4388        dout("mdsc_destroy %p\n", mdsc);
4389
4390        if (!mdsc)
4391                return;
4392
4393        /* flush out any connection work with references to us */
4394        ceph_msgr_flush();
4395
4396        ceph_mdsc_stop(mdsc);
4397
4398        fsc->mdsc = NULL;
4399        kfree(mdsc);
4400        dout("mdsc_destroy %p done\n", mdsc);
4401}
4402
4403void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4404{
4405        struct ceph_fs_client *fsc = mdsc->fsc;
4406        const char *mds_namespace = fsc->mount_options->mds_namespace;
4407        void *p = msg->front.iov_base;
4408        void *end = p + msg->front.iov_len;
4409        u32 epoch;
4410        u32 map_len;
4411        u32 num_fs;
4412        u32 mount_fscid = (u32)-1;
4413        u8 struct_v, struct_cv;
4414        int err = -EINVAL;
4415
4416        ceph_decode_need(&p, end, sizeof(u32), bad);
4417        epoch = ceph_decode_32(&p);
4418
4419        dout("handle_fsmap epoch %u\n", epoch);
4420
4421        ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4422        struct_v = ceph_decode_8(&p);
4423        struct_cv = ceph_decode_8(&p);
4424        map_len = ceph_decode_32(&p);
4425
4426        ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4427        p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4428
4429        num_fs = ceph_decode_32(&p);
4430        while (num_fs-- > 0) {
4431                void *info_p, *info_end;
4432                u32 info_len;
4433                u8 info_v, info_cv;
4434                u32 fscid, namelen;
4435
4436                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4437                info_v = ceph_decode_8(&p);
4438                info_cv = ceph_decode_8(&p);
4439                info_len = ceph_decode_32(&p);
4440                ceph_decode_need(&p, end, info_len, bad);
4441                info_p = p;
4442                info_end = p + info_len;
4443                p = info_end;
4444
4445                ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4446                fscid = ceph_decode_32(&info_p);
4447                namelen = ceph_decode_32(&info_p);
4448                ceph_decode_need(&info_p, info_end, namelen, bad);
4449
4450                if (mds_namespace &&
4451                    strlen(mds_namespace) == namelen &&
4452                    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4453                        mount_fscid = fscid;
4454                        break;
4455                }
4456        }
4457
4458        ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4459        if (mount_fscid != (u32)-1) {
4460                fsc->client->monc.fs_cluster_id = mount_fscid;
4461                ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4462                                   0, true);
4463                ceph_monc_renew_subs(&fsc->client->monc);
4464        } else {
4465                err = -ENOENT;
4466                goto err_out;
4467        }
4468        return;
4469
4470bad:
4471        pr_err("error decoding fsmap\n");
4472err_out:
4473        mutex_lock(&mdsc->mutex);
4474        mdsc->mdsmap_err = err;
4475        __wake_requests(mdsc, &mdsc->waiting_for_map);
4476        mutex_unlock(&mdsc->mutex);
4477}
4478
4479/*
4480 * handle mds map update.
4481 */
4482void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4483{
4484        u32 epoch;
4485        u32 maplen;
4486        void *p = msg->front.iov_base;
4487        void *end = p + msg->front.iov_len;
4488        struct ceph_mdsmap *newmap, *oldmap;
4489        struct ceph_fsid fsid;
4490        int err = -EINVAL;
4491
4492        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4493        ceph_decode_copy(&p, &fsid, sizeof(fsid));
4494        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4495                return;
4496        epoch = ceph_decode_32(&p);
4497        maplen = ceph_decode_32(&p);
4498        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4499
4500        /* do we need it? */
4501        mutex_lock(&mdsc->mutex);
4502        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4503                dout("handle_map epoch %u <= our %u\n",
4504                     epoch, mdsc->mdsmap->m_epoch);
4505                mutex_unlock(&mdsc->mutex);
4506                return;
4507        }
4508
4509        newmap = ceph_mdsmap_decode(&p, end);
4510        if (IS_ERR(newmap)) {
4511                err = PTR_ERR(newmap);
4512                goto bad_unlock;
4513        }
4514
4515        /* swap into place */
4516        if (mdsc->mdsmap) {
4517                oldmap = mdsc->mdsmap;
4518                mdsc->mdsmap = newmap;
4519                check_new_map(mdsc, newmap, oldmap);
4520                ceph_mdsmap_destroy(oldmap);
4521        } else {
4522                mdsc->mdsmap = newmap;  /* first mds map */
4523        }
4524        mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4525                                        MAX_LFS_FILESIZE);
4526
4527        __wake_requests(mdsc, &mdsc->waiting_for_map);
4528        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4529                          mdsc->mdsmap->m_epoch);
4530
4531        mutex_unlock(&mdsc->mutex);
4532        schedule_delayed(mdsc);
4533        return;
4534
4535bad_unlock:
4536        mutex_unlock(&mdsc->mutex);
4537bad:
4538        pr_err("error decoding mdsmap %d\n", err);
4539        return;
4540}
4541
4542static struct ceph_connection *con_get(struct ceph_connection *con)
4543{
4544        struct ceph_mds_session *s = con->private;
4545
4546        if (get_session(s)) {
4547                dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4548                return con;
4549        }
4550        dout("mdsc con_get %p FAIL\n", s);
4551        return NULL;
4552}
4553
4554static void con_put(struct ceph_connection *con)
4555{
4556        struct ceph_mds_session *s = con->private;
4557
4558        dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4559        ceph_put_mds_session(s);
4560}
4561
4562/*
4563 * if the client is unresponsive for long enough, the mds will kill
4564 * the session entirely.
4565 */
4566static void peer_reset(struct ceph_connection *con)
4567{
4568        struct ceph_mds_session *s = con->private;
4569        struct ceph_mds_client *mdsc = s->s_mdsc;
4570
4571        pr_warn("mds%d closed our session\n", s->s_mds);
4572        send_mds_reconnect(mdsc, s);
4573}
4574
4575static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4576{
4577        struct ceph_mds_session *s = con->private;
4578        struct ceph_mds_client *mdsc = s->s_mdsc;
4579        int type = le16_to_cpu(msg->hdr.type);
4580
4581        mutex_lock(&mdsc->mutex);
4582        if (__verify_registered_session(mdsc, s) < 0) {
4583                mutex_unlock(&mdsc->mutex);
4584                goto out;
4585        }
4586        mutex_unlock(&mdsc->mutex);
4587
4588        switch (type) {
4589        case CEPH_MSG_MDS_MAP:
4590                ceph_mdsc_handle_mdsmap(mdsc, msg);
4591                break;
4592        case CEPH_MSG_FS_MAP_USER:
4593                ceph_mdsc_handle_fsmap(mdsc, msg);
4594                break;
4595        case CEPH_MSG_CLIENT_SESSION:
4596                handle_session(s, msg);
4597                break;
4598        case CEPH_MSG_CLIENT_REPLY:
4599                handle_reply(s, msg);
4600                break;
4601        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4602                handle_forward(mdsc, s, msg);
4603                break;
4604        case CEPH_MSG_CLIENT_CAPS:
4605                ceph_handle_caps(s, msg);
4606                break;
4607        case CEPH_MSG_CLIENT_SNAP:
4608                ceph_handle_snap(mdsc, s, msg);
4609                break;
4610        case CEPH_MSG_CLIENT_LEASE:
4611                handle_lease(mdsc, s, msg);
4612                break;
4613        case CEPH_MSG_CLIENT_QUOTA:
4614                ceph_handle_quota(mdsc, s, msg);
4615                break;
4616
4617        default:
4618                pr_err("received unknown message type %d %s\n", type,
4619                       ceph_msg_type_name(type));
4620        }
4621out:
4622        ceph_msg_put(msg);
4623}
4624
4625/*
4626 * authentication
4627 */
4628
4629/*
4630 * Note: returned pointer is the address of a structure that's
4631 * managed separately.  Caller must *not* attempt to free it.
4632 */
4633static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4634                                        int *proto, int force_new)
4635{
4636        struct ceph_mds_session *s = con->private;
4637        struct ceph_mds_client *mdsc = s->s_mdsc;
4638        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4639        struct ceph_auth_handshake *auth = &s->s_auth;
4640
4641        if (force_new && auth->authorizer) {
4642                ceph_auth_destroy_authorizer(auth->authorizer);
4643                auth->authorizer = NULL;
4644        }
4645        if (!auth->authorizer) {
4646                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4647                                                      auth);
4648                if (ret)
4649                        return ERR_PTR(ret);
4650        } else {
4651                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4652                                                      auth);
4653                if (ret)
4654                        return ERR_PTR(ret);
4655        }
4656        *proto = ac->protocol;
4657
4658        return auth;
4659}
4660
4661static int add_authorizer_challenge(struct ceph_connection *con,
4662                                    void *challenge_buf, int challenge_buf_len)
4663{
4664        struct ceph_mds_session *s = con->private;
4665        struct ceph_mds_client *mdsc = s->s_mdsc;
4666        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4667
4668        return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4669                                            challenge_buf, challenge_buf_len);
4670}
4671
4672static int verify_authorizer_reply(struct ceph_connection *con)
4673{
4674        struct ceph_mds_session *s = con->private;
4675        struct ceph_mds_client *mdsc = s->s_mdsc;
4676        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4677
4678        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4679}
4680
4681static int invalidate_authorizer(struct ceph_connection *con)
4682{
4683        struct ceph_mds_session *s = con->private;
4684        struct ceph_mds_client *mdsc = s->s_mdsc;
4685        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4686
4687        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4688
4689        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4690}
4691
4692static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4693                                struct ceph_msg_header *hdr, int *skip)
4694{
4695        struct ceph_msg *msg;
4696        int type = (int) le16_to_cpu(hdr->type);
4697        int front_len = (int) le32_to_cpu(hdr->front_len);
4698
4699        if (con->in_msg)
4700                return con->in_msg;
4701
4702        *skip = 0;
4703        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4704        if (!msg) {
4705                pr_err("unable to allocate msg type %d len %d\n",
4706                       type, front_len);
4707                return NULL;
4708        }
4709
4710        return msg;
4711}
4712
4713static int mds_sign_message(struct ceph_msg *msg)
4714{
4715       struct ceph_mds_session *s = msg->con->private;
4716       struct ceph_auth_handshake *auth = &s->s_auth;
4717
4718       return ceph_auth_sign_message(auth, msg);
4719}
4720
4721static int mds_check_message_signature(struct ceph_msg *msg)
4722{
4723       struct ceph_mds_session *s = msg->con->private;
4724       struct ceph_auth_handshake *auth = &s->s_auth;
4725
4726       return ceph_auth_check_message_signature(auth, msg);
4727}
4728
4729static const struct ceph_connection_operations mds_con_ops = {
4730        .get = con_get,
4731        .put = con_put,
4732        .dispatch = dispatch,
4733        .get_authorizer = get_authorizer,
4734        .add_authorizer_challenge = add_authorizer_challenge,
4735        .verify_authorizer_reply = verify_authorizer_reply,
4736        .invalidate_authorizer = invalidate_authorizer,
4737        .peer_reset = peer_reset,
4738        .alloc_msg = mds_alloc_msg,
4739        .sign_message = mds_sign_message,
4740        .check_message_signature = mds_check_message_signature,
4741};
4742
4743/* eof */
4744