linux/fs/ceph/mds_client.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/fs.h>
   5#include <linux/wait.h>
   6#include <linux/slab.h>
   7#include <linux/gfp.h>
   8#include <linux/sched.h>
   9#include <linux/debugfs.h>
  10#include <linux/seq_file.h>
  11#include <linux/ratelimit.h>
  12#include <linux/bits.h>
  13
  14#include "super.h"
  15#include "mds_client.h"
  16
  17#include <linux/ceph/ceph_features.h>
  18#include <linux/ceph/messenger.h>
  19#include <linux/ceph/decode.h>
  20#include <linux/ceph/pagelist.h>
  21#include <linux/ceph/auth.h>
  22#include <linux/ceph/debugfs.h>
  23
  24#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
  25
  26/*
  27 * A cluster of MDS (metadata server) daemons is responsible for
  28 * managing the file system namespace (the directory hierarchy and
  29 * inodes) and for coordinating shared access to storage.  Metadata is
  30 * partitioning hierarchically across a number of servers, and that
  31 * partition varies over time as the cluster adjusts the distribution
  32 * in order to balance load.
  33 *
  34 * The MDS client is primarily responsible to managing synchronous
  35 * metadata requests for operations like open, unlink, and so forth.
  36 * If there is a MDS failure, we find out about it when we (possibly
  37 * request and) receive a new MDS map, and can resubmit affected
  38 * requests.
  39 *
  40 * For the most part, though, we take advantage of a lossless
  41 * communications channel to the MDS, and do not need to worry about
  42 * timing out or resubmitting requests.
  43 *
  44 * We maintain a stateful "session" with each MDS we interact with.
  45 * Within each session, we sent periodic heartbeat messages to ensure
  46 * any capabilities or leases we have been issues remain valid.  If
  47 * the session times out and goes stale, our leases and capabilities
  48 * are no longer valid.
  49 */
  50
  51struct ceph_reconnect_state {
  52        struct ceph_mds_session *session;
  53        int nr_caps, nr_realms;
  54        struct ceph_pagelist *pagelist;
  55        unsigned msg_version;
  56        bool allow_multi;
  57};
  58
  59static void __wake_requests(struct ceph_mds_client *mdsc,
  60                            struct list_head *head);
  61static void ceph_cap_release_work(struct work_struct *work);
  62static void ceph_cap_reclaim_work(struct work_struct *work);
  63
  64static const struct ceph_connection_operations mds_con_ops;
  65
  66
  67/*
  68 * mds reply parsing
  69 */
  70
  71static int parse_reply_info_quota(void **p, void *end,
  72                                  struct ceph_mds_reply_info_in *info)
  73{
  74        u8 struct_v, struct_compat;
  75        u32 struct_len;
  76
  77        ceph_decode_8_safe(p, end, struct_v, bad);
  78        ceph_decode_8_safe(p, end, struct_compat, bad);
  79        /* struct_v is expected to be >= 1. we only
  80         * understand encoding with struct_compat == 1. */
  81        if (!struct_v || struct_compat != 1)
  82                goto bad;
  83        ceph_decode_32_safe(p, end, struct_len, bad);
  84        ceph_decode_need(p, end, struct_len, bad);
  85        end = *p + struct_len;
  86        ceph_decode_64_safe(p, end, info->max_bytes, bad);
  87        ceph_decode_64_safe(p, end, info->max_files, bad);
  88        *p = end;
  89        return 0;
  90bad:
  91        return -EIO;
  92}
  93
  94/*
  95 * parse individual inode info
  96 */
  97static int parse_reply_info_in(void **p, void *end,
  98                               struct ceph_mds_reply_info_in *info,
  99                               u64 features)
 100{
 101        int err = 0;
 102        u8 struct_v = 0;
 103
 104        if (features == (u64)-1) {
 105                u32 struct_len;
 106                u8 struct_compat;
 107                ceph_decode_8_safe(p, end, struct_v, bad);
 108                ceph_decode_8_safe(p, end, struct_compat, bad);
 109                /* struct_v is expected to be >= 1. we only understand
 110                 * encoding with struct_compat == 1. */
 111                if (!struct_v || struct_compat != 1)
 112                        goto bad;
 113                ceph_decode_32_safe(p, end, struct_len, bad);
 114                ceph_decode_need(p, end, struct_len, bad);
 115                end = *p + struct_len;
 116        }
 117
 118        ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
 119        info->in = *p;
 120        *p += sizeof(struct ceph_mds_reply_inode) +
 121                sizeof(*info->in->fragtree.splits) *
 122                le32_to_cpu(info->in->fragtree.nsplits);
 123
 124        ceph_decode_32_safe(p, end, info->symlink_len, bad);
 125        ceph_decode_need(p, end, info->symlink_len, bad);
 126        info->symlink = *p;
 127        *p += info->symlink_len;
 128
 129        ceph_decode_copy_safe(p, end, &info->dir_layout,
 130                              sizeof(info->dir_layout), bad);
 131        ceph_decode_32_safe(p, end, info->xattr_len, bad);
 132        ceph_decode_need(p, end, info->xattr_len, bad);
 133        info->xattr_data = *p;
 134        *p += info->xattr_len;
 135
 136        if (features == (u64)-1) {
 137                /* inline data */
 138                ceph_decode_64_safe(p, end, info->inline_version, bad);
 139                ceph_decode_32_safe(p, end, info->inline_len, bad);
 140                ceph_decode_need(p, end, info->inline_len, bad);
 141                info->inline_data = *p;
 142                *p += info->inline_len;
 143                /* quota */
 144                err = parse_reply_info_quota(p, end, info);
 145                if (err < 0)
 146                        goto out_bad;
 147                /* pool namespace */
 148                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 149                if (info->pool_ns_len > 0) {
 150                        ceph_decode_need(p, end, info->pool_ns_len, bad);
 151                        info->pool_ns_data = *p;
 152                        *p += info->pool_ns_len;
 153                }
 154
 155                /* btime */
 156                ceph_decode_need(p, end, sizeof(info->btime), bad);
 157                ceph_decode_copy(p, &info->btime, sizeof(info->btime));
 158
 159                /* change attribute */
 160                ceph_decode_64_safe(p, end, info->change_attr, bad);
 161
 162                /* dir pin */
 163                if (struct_v >= 2) {
 164                        ceph_decode_32_safe(p, end, info->dir_pin, bad);
 165                } else {
 166                        info->dir_pin = -ENODATA;
 167                }
 168
 169                /* snapshot birth time, remains zero for v<=2 */
 170                if (struct_v >= 3) {
 171                        ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
 172                        ceph_decode_copy(p, &info->snap_btime,
 173                                         sizeof(info->snap_btime));
 174                } else {
 175                        memset(&info->snap_btime, 0, sizeof(info->snap_btime));
 176                }
 177
 178                *p = end;
 179        } else {
 180                if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
 181                        ceph_decode_64_safe(p, end, info->inline_version, bad);
 182                        ceph_decode_32_safe(p, end, info->inline_len, bad);
 183                        ceph_decode_need(p, end, info->inline_len, bad);
 184                        info->inline_data = *p;
 185                        *p += info->inline_len;
 186                } else
 187                        info->inline_version = CEPH_INLINE_NONE;
 188
 189                if (features & CEPH_FEATURE_MDS_QUOTA) {
 190                        err = parse_reply_info_quota(p, end, info);
 191                        if (err < 0)
 192                                goto out_bad;
 193                } else {
 194                        info->max_bytes = 0;
 195                        info->max_files = 0;
 196                }
 197
 198                info->pool_ns_len = 0;
 199                info->pool_ns_data = NULL;
 200                if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 201                        ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 202                        if (info->pool_ns_len > 0) {
 203                                ceph_decode_need(p, end, info->pool_ns_len, bad);
 204                                info->pool_ns_data = *p;
 205                                *p += info->pool_ns_len;
 206                        }
 207                }
 208
 209                if (features & CEPH_FEATURE_FS_BTIME) {
 210                        ceph_decode_need(p, end, sizeof(info->btime), bad);
 211                        ceph_decode_copy(p, &info->btime, sizeof(info->btime));
 212                        ceph_decode_64_safe(p, end, info->change_attr, bad);
 213                }
 214
 215                info->dir_pin = -ENODATA;
 216                /* info->snap_btime remains zero */
 217        }
 218        return 0;
 219bad:
 220        err = -EIO;
 221out_bad:
 222        return err;
 223}
 224
 225static int parse_reply_info_dir(void **p, void *end,
 226                                struct ceph_mds_reply_dirfrag **dirfrag,
 227                                u64 features)
 228{
 229        if (features == (u64)-1) {
 230                u8 struct_v, struct_compat;
 231                u32 struct_len;
 232                ceph_decode_8_safe(p, end, struct_v, bad);
 233                ceph_decode_8_safe(p, end, struct_compat, bad);
 234                /* struct_v is expected to be >= 1. we only understand
 235                 * encoding whose struct_compat == 1. */
 236                if (!struct_v || struct_compat != 1)
 237                        goto bad;
 238                ceph_decode_32_safe(p, end, struct_len, bad);
 239                ceph_decode_need(p, end, struct_len, bad);
 240                end = *p + struct_len;
 241        }
 242
 243        ceph_decode_need(p, end, sizeof(**dirfrag), bad);
 244        *dirfrag = *p;
 245        *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
 246        if (unlikely(*p > end))
 247                goto bad;
 248        if (features == (u64)-1)
 249                *p = end;
 250        return 0;
 251bad:
 252        return -EIO;
 253}
 254
 255static int parse_reply_info_lease(void **p, void *end,
 256                                  struct ceph_mds_reply_lease **lease,
 257                                  u64 features)
 258{
 259        if (features == (u64)-1) {
 260                u8 struct_v, struct_compat;
 261                u32 struct_len;
 262                ceph_decode_8_safe(p, end, struct_v, bad);
 263                ceph_decode_8_safe(p, end, struct_compat, bad);
 264                /* struct_v is expected to be >= 1. we only understand
 265                 * encoding whose struct_compat == 1. */
 266                if (!struct_v || struct_compat != 1)
 267                        goto bad;
 268                ceph_decode_32_safe(p, end, struct_len, bad);
 269                ceph_decode_need(p, end, struct_len, bad);
 270                end = *p + struct_len;
 271        }
 272
 273        ceph_decode_need(p, end, sizeof(**lease), bad);
 274        *lease = *p;
 275        *p += sizeof(**lease);
 276        if (features == (u64)-1)
 277                *p = end;
 278        return 0;
 279bad:
 280        return -EIO;
 281}
 282
 283/*
 284 * parse a normal reply, which may contain a (dir+)dentry and/or a
 285 * target inode.
 286 */
 287static int parse_reply_info_trace(void **p, void *end,
 288                                  struct ceph_mds_reply_info_parsed *info,
 289                                  u64 features)
 290{
 291        int err;
 292
 293        if (info->head->is_dentry) {
 294                err = parse_reply_info_in(p, end, &info->diri, features);
 295                if (err < 0)
 296                        goto out_bad;
 297
 298                err = parse_reply_info_dir(p, end, &info->dirfrag, features);
 299                if (err < 0)
 300                        goto out_bad;
 301
 302                ceph_decode_32_safe(p, end, info->dname_len, bad);
 303                ceph_decode_need(p, end, info->dname_len, bad);
 304                info->dname = *p;
 305                *p += info->dname_len;
 306
 307                err = parse_reply_info_lease(p, end, &info->dlease, features);
 308                if (err < 0)
 309                        goto out_bad;
 310        }
 311
 312        if (info->head->is_target) {
 313                err = parse_reply_info_in(p, end, &info->targeti, features);
 314                if (err < 0)
 315                        goto out_bad;
 316        }
 317
 318        if (unlikely(*p != end))
 319                goto bad;
 320        return 0;
 321
 322bad:
 323        err = -EIO;
 324out_bad:
 325        pr_err("problem parsing mds trace %d\n", err);
 326        return err;
 327}
 328
 329/*
 330 * parse readdir results
 331 */
 332static int parse_reply_info_readdir(void **p, void *end,
 333                                struct ceph_mds_reply_info_parsed *info,
 334                                u64 features)
 335{
 336        u32 num, i = 0;
 337        int err;
 338
 339        err = parse_reply_info_dir(p, end, &info->dir_dir, features);
 340        if (err < 0)
 341                goto out_bad;
 342
 343        ceph_decode_need(p, end, sizeof(num) + 2, bad);
 344        num = ceph_decode_32(p);
 345        {
 346                u16 flags = ceph_decode_16(p);
 347                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 348                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 349                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
 350                info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 351        }
 352        if (num == 0)
 353                goto done;
 354
 355        BUG_ON(!info->dir_entries);
 356        if ((unsigned long)(info->dir_entries + num) >
 357            (unsigned long)info->dir_entries + info->dir_buf_size) {
 358                pr_err("dir contents are larger than expected\n");
 359                WARN_ON(1);
 360                goto bad;
 361        }
 362
 363        info->dir_nr = num;
 364        while (num) {
 365                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 366                /* dentry */
 367                ceph_decode_32_safe(p, end, rde->name_len, bad);
 368                ceph_decode_need(p, end, rde->name_len, bad);
 369                rde->name = *p;
 370                *p += rde->name_len;
 371                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
 372
 373                /* dentry lease */
 374                err = parse_reply_info_lease(p, end, &rde->lease, features);
 375                if (err)
 376                        goto out_bad;
 377                /* inode */
 378                err = parse_reply_info_in(p, end, &rde->inode, features);
 379                if (err < 0)
 380                        goto out_bad;
 381                /* ceph_readdir_prepopulate() will update it */
 382                rde->offset = 0;
 383                i++;
 384                num--;
 385        }
 386
 387done:
 388        /* Skip over any unrecognized fields */
 389        *p = end;
 390        return 0;
 391
 392bad:
 393        err = -EIO;
 394out_bad:
 395        pr_err("problem parsing dir contents %d\n", err);
 396        return err;
 397}
 398
 399/*
 400 * parse fcntl F_GETLK results
 401 */
 402static int parse_reply_info_filelock(void **p, void *end,
 403                                     struct ceph_mds_reply_info_parsed *info,
 404                                     u64 features)
 405{
 406        if (*p + sizeof(*info->filelock_reply) > end)
 407                goto bad;
 408
 409        info->filelock_reply = *p;
 410
 411        /* Skip over any unrecognized fields */
 412        *p = end;
 413        return 0;
 414bad:
 415        return -EIO;
 416}
 417
 418
 419#if BITS_PER_LONG == 64
 420
 421#define DELEGATED_INO_AVAILABLE         xa_mk_value(1)
 422
 423static int ceph_parse_deleg_inos(void **p, void *end,
 424                                 struct ceph_mds_session *s)
 425{
 426        u32 sets;
 427
 428        ceph_decode_32_safe(p, end, sets, bad);
 429        dout("got %u sets of delegated inodes\n", sets);
 430        while (sets--) {
 431                u64 start, len, ino;
 432
 433                ceph_decode_64_safe(p, end, start, bad);
 434                ceph_decode_64_safe(p, end, len, bad);
 435                while (len--) {
 436                        int err = xa_insert(&s->s_delegated_inos, ino = start++,
 437                                            DELEGATED_INO_AVAILABLE,
 438                                            GFP_KERNEL);
 439                        if (!err) {
 440                                dout("added delegated inode 0x%llx\n",
 441                                     start - 1);
 442                        } else if (err == -EBUSY) {
 443                                pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
 444                                        start - 1);
 445                        } else {
 446                                return err;
 447                        }
 448                }
 449        }
 450        return 0;
 451bad:
 452        return -EIO;
 453}
 454
 455u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
 456{
 457        unsigned long ino;
 458        void *val;
 459
 460        xa_for_each(&s->s_delegated_inos, ino, val) {
 461                val = xa_erase(&s->s_delegated_inos, ino);
 462                if (val == DELEGATED_INO_AVAILABLE)
 463                        return ino;
 464        }
 465        return 0;
 466}
 467
 468int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
 469{
 470        return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
 471                         GFP_KERNEL);
 472}
 473#else /* BITS_PER_LONG == 64 */
 474/*
 475 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
 476 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
 477 * and bottom words?
 478 */
 479static int ceph_parse_deleg_inos(void **p, void *end,
 480                                 struct ceph_mds_session *s)
 481{
 482        u32 sets;
 483
 484        ceph_decode_32_safe(p, end, sets, bad);
 485        if (sets)
 486                ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
 487        return 0;
 488bad:
 489        return -EIO;
 490}
 491
 492u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
 493{
 494        return 0;
 495}
 496
 497int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
 498{
 499        return 0;
 500}
 501#endif /* BITS_PER_LONG == 64 */
 502
 503/*
 504 * parse create results
 505 */
 506static int parse_reply_info_create(void **p, void *end,
 507                                  struct ceph_mds_reply_info_parsed *info,
 508                                  u64 features, struct ceph_mds_session *s)
 509{
 510        int ret;
 511
 512        if (features == (u64)-1 ||
 513            (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
 514                if (*p == end) {
 515                        /* Malformed reply? */
 516                        info->has_create_ino = false;
 517                } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
 518                        u8 struct_v, struct_compat;
 519                        u32 len;
 520
 521                        info->has_create_ino = true;
 522                        ceph_decode_8_safe(p, end, struct_v, bad);
 523                        ceph_decode_8_safe(p, end, struct_compat, bad);
 524                        ceph_decode_32_safe(p, end, len, bad);
 525                        ceph_decode_64_safe(p, end, info->ino, bad);
 526                        ret = ceph_parse_deleg_inos(p, end, s);
 527                        if (ret)
 528                                return ret;
 529                } else {
 530                        /* legacy */
 531                        ceph_decode_64_safe(p, end, info->ino, bad);
 532                        info->has_create_ino = true;
 533                }
 534        } else {
 535                if (*p != end)
 536                        goto bad;
 537        }
 538
 539        /* Skip over any unrecognized fields */
 540        *p = end;
 541        return 0;
 542bad:
 543        return -EIO;
 544}
 545
 546/*
 547 * parse extra results
 548 */
 549static int parse_reply_info_extra(void **p, void *end,
 550                                  struct ceph_mds_reply_info_parsed *info,
 551                                  u64 features, struct ceph_mds_session *s)
 552{
 553        u32 op = le32_to_cpu(info->head->op);
 554
 555        if (op == CEPH_MDS_OP_GETFILELOCK)
 556                return parse_reply_info_filelock(p, end, info, features);
 557        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
 558                return parse_reply_info_readdir(p, end, info, features);
 559        else if (op == CEPH_MDS_OP_CREATE)
 560                return parse_reply_info_create(p, end, info, features, s);
 561        else
 562                return -EIO;
 563}
 564
 565/*
 566 * parse entire mds reply
 567 */
 568static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
 569                            struct ceph_mds_reply_info_parsed *info,
 570                            u64 features)
 571{
 572        void *p, *end;
 573        u32 len;
 574        int err;
 575
 576        info->head = msg->front.iov_base;
 577        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 578        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 579
 580        /* trace */
 581        ceph_decode_32_safe(&p, end, len, bad);
 582        if (len > 0) {
 583                ceph_decode_need(&p, end, len, bad);
 584                err = parse_reply_info_trace(&p, p+len, info, features);
 585                if (err < 0)
 586                        goto out_bad;
 587        }
 588
 589        /* extra */
 590        ceph_decode_32_safe(&p, end, len, bad);
 591        if (len > 0) {
 592                ceph_decode_need(&p, end, len, bad);
 593                err = parse_reply_info_extra(&p, p+len, info, features, s);
 594                if (err < 0)
 595                        goto out_bad;
 596        }
 597
 598        /* snap blob */
 599        ceph_decode_32_safe(&p, end, len, bad);
 600        info->snapblob_len = len;
 601        info->snapblob = p;
 602        p += len;
 603
 604        if (p != end)
 605                goto bad;
 606        return 0;
 607
 608bad:
 609        err = -EIO;
 610out_bad:
 611        pr_err("mds parse_reply err %d\n", err);
 612        return err;
 613}
 614
 615static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 616{
 617        if (!info->dir_entries)
 618                return;
 619        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 620}
 621
 622
 623/*
 624 * sessions
 625 */
 626const char *ceph_session_state_name(int s)
 627{
 628        switch (s) {
 629        case CEPH_MDS_SESSION_NEW: return "new";
 630        case CEPH_MDS_SESSION_OPENING: return "opening";
 631        case CEPH_MDS_SESSION_OPEN: return "open";
 632        case CEPH_MDS_SESSION_HUNG: return "hung";
 633        case CEPH_MDS_SESSION_CLOSING: return "closing";
 634        case CEPH_MDS_SESSION_CLOSED: return "closed";
 635        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 636        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 637        case CEPH_MDS_SESSION_REJECTED: return "rejected";
 638        default: return "???";
 639        }
 640}
 641
 642struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
 643{
 644        if (refcount_inc_not_zero(&s->s_ref)) {
 645                dout("mdsc get_session %p %d -> %d\n", s,
 646                     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
 647                return s;
 648        } else {
 649                dout("mdsc get_session %p 0 -- FAIL\n", s);
 650                return NULL;
 651        }
 652}
 653
 654void ceph_put_mds_session(struct ceph_mds_session *s)
 655{
 656        dout("mdsc put_session %p %d -> %d\n", s,
 657             refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
 658        if (refcount_dec_and_test(&s->s_ref)) {
 659                if (s->s_auth.authorizer)
 660                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 661                WARN_ON(mutex_is_locked(&s->s_mutex));
 662                xa_destroy(&s->s_delegated_inos);
 663                kfree(s);
 664        }
 665}
 666
 667/*
 668 * called under mdsc->mutex
 669 */
 670struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 671                                                   int mds)
 672{
 673        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 674                return NULL;
 675        return ceph_get_mds_session(mdsc->sessions[mds]);
 676}
 677
 678static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 679{
 680        if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 681                return false;
 682        else
 683                return true;
 684}
 685
 686static int __verify_registered_session(struct ceph_mds_client *mdsc,
 687                                       struct ceph_mds_session *s)
 688{
 689        if (s->s_mds >= mdsc->max_sessions ||
 690            mdsc->sessions[s->s_mds] != s)
 691                return -ENOENT;
 692        return 0;
 693}
 694
 695/*
 696 * create+register a new session for given mds.
 697 * called under mdsc->mutex.
 698 */
 699static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 700                                                 int mds)
 701{
 702        struct ceph_mds_session *s;
 703
 704        if (mds >= mdsc->mdsmap->possible_max_rank)
 705                return ERR_PTR(-EINVAL);
 706
 707        s = kzalloc(sizeof(*s), GFP_NOFS);
 708        if (!s)
 709                return ERR_PTR(-ENOMEM);
 710
 711        if (mds >= mdsc->max_sessions) {
 712                int newmax = 1 << get_count_order(mds + 1);
 713                struct ceph_mds_session **sa;
 714
 715                dout("%s: realloc to %d\n", __func__, newmax);
 716                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 717                if (!sa)
 718                        goto fail_realloc;
 719                if (mdsc->sessions) {
 720                        memcpy(sa, mdsc->sessions,
 721                               mdsc->max_sessions * sizeof(void *));
 722                        kfree(mdsc->sessions);
 723                }
 724                mdsc->sessions = sa;
 725                mdsc->max_sessions = newmax;
 726        }
 727
 728        dout("%s: mds%d\n", __func__, mds);
 729        s->s_mdsc = mdsc;
 730        s->s_mds = mds;
 731        s->s_state = CEPH_MDS_SESSION_NEW;
 732        s->s_ttl = 0;
 733        s->s_seq = 0;
 734        mutex_init(&s->s_mutex);
 735
 736        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 737
 738        spin_lock_init(&s->s_gen_ttl_lock);
 739        s->s_cap_gen = 1;
 740        s->s_cap_ttl = jiffies - 1;
 741
 742        spin_lock_init(&s->s_cap_lock);
 743        s->s_renew_requested = 0;
 744        s->s_renew_seq = 0;
 745        INIT_LIST_HEAD(&s->s_caps);
 746        s->s_nr_caps = 0;
 747        refcount_set(&s->s_ref, 1);
 748        INIT_LIST_HEAD(&s->s_waiting);
 749        INIT_LIST_HEAD(&s->s_unsafe);
 750        xa_init(&s->s_delegated_inos);
 751        s->s_num_cap_releases = 0;
 752        s->s_cap_reconnect = 0;
 753        s->s_cap_iterator = NULL;
 754        INIT_LIST_HEAD(&s->s_cap_releases);
 755        INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
 756
 757        INIT_LIST_HEAD(&s->s_cap_dirty);
 758        INIT_LIST_HEAD(&s->s_cap_flushing);
 759
 760        mdsc->sessions[mds] = s;
 761        atomic_inc(&mdsc->num_sessions);
 762        refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 763
 764        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 765                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 766
 767        return s;
 768
 769fail_realloc:
 770        kfree(s);
 771        return ERR_PTR(-ENOMEM);
 772}
 773
 774/*
 775 * called under mdsc->mutex
 776 */
 777static void __unregister_session(struct ceph_mds_client *mdsc,
 778                               struct ceph_mds_session *s)
 779{
 780        dout("__unregister_session mds%d %p\n", s->s_mds, s);
 781        BUG_ON(mdsc->sessions[s->s_mds] != s);
 782        mdsc->sessions[s->s_mds] = NULL;
 783        ceph_con_close(&s->s_con);
 784        ceph_put_mds_session(s);
 785        atomic_dec(&mdsc->num_sessions);
 786}
 787
 788/*
 789 * drop session refs in request.
 790 *
 791 * should be last request ref, or hold mdsc->mutex
 792 */
 793static void put_request_session(struct ceph_mds_request *req)
 794{
 795        if (req->r_session) {
 796                ceph_put_mds_session(req->r_session);
 797                req->r_session = NULL;
 798        }
 799}
 800
 801void ceph_mdsc_release_request(struct kref *kref)
 802{
 803        struct ceph_mds_request *req = container_of(kref,
 804                                                    struct ceph_mds_request,
 805                                                    r_kref);
 806        ceph_mdsc_release_dir_caps_no_check(req);
 807        destroy_reply_info(&req->r_reply_info);
 808        if (req->r_request)
 809                ceph_msg_put(req->r_request);
 810        if (req->r_reply)
 811                ceph_msg_put(req->r_reply);
 812        if (req->r_inode) {
 813                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 814                /* avoid calling iput_final() in mds dispatch threads */
 815                ceph_async_iput(req->r_inode);
 816        }
 817        if (req->r_parent) {
 818                ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
 819                ceph_async_iput(req->r_parent);
 820        }
 821        ceph_async_iput(req->r_target_inode);
 822        if (req->r_dentry)
 823                dput(req->r_dentry);
 824        if (req->r_old_dentry)
 825                dput(req->r_old_dentry);
 826        if (req->r_old_dentry_dir) {
 827                /*
 828                 * track (and drop pins for) r_old_dentry_dir
 829                 * separately, since r_old_dentry's d_parent may have
 830                 * changed between the dir mutex being dropped and
 831                 * this request being freed.
 832                 */
 833                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 834                                  CEPH_CAP_PIN);
 835                ceph_async_iput(req->r_old_dentry_dir);
 836        }
 837        kfree(req->r_path1);
 838        kfree(req->r_path2);
 839        if (req->r_pagelist)
 840                ceph_pagelist_release(req->r_pagelist);
 841        put_request_session(req);
 842        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 843        WARN_ON_ONCE(!list_empty(&req->r_wait));
 844        kmem_cache_free(ceph_mds_request_cachep, req);
 845}
 846
 847DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
 848
 849/*
 850 * lookup session, bump ref if found.
 851 *
 852 * called under mdsc->mutex.
 853 */
 854static struct ceph_mds_request *
 855lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
 856{
 857        struct ceph_mds_request *req;
 858
 859        req = lookup_request(&mdsc->request_tree, tid);
 860        if (req)
 861                ceph_mdsc_get_request(req);
 862
 863        return req;
 864}
 865
 866/*
 867 * Register an in-flight request, and assign a tid.  Link to directory
 868 * are modifying (if any).
 869 *
 870 * Called under mdsc->mutex.
 871 */
 872static void __register_request(struct ceph_mds_client *mdsc,
 873                               struct ceph_mds_request *req,
 874                               struct inode *dir)
 875{
 876        int ret = 0;
 877
 878        req->r_tid = ++mdsc->last_tid;
 879        if (req->r_num_caps) {
 880                ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 881                                        req->r_num_caps);
 882                if (ret < 0) {
 883                        pr_err("__register_request %p "
 884                               "failed to reserve caps: %d\n", req, ret);
 885                        /* set req->r_err to fail early from __do_request */
 886                        req->r_err = ret;
 887                        return;
 888                }
 889        }
 890        dout("__register_request %p tid %lld\n", req, req->r_tid);
 891        ceph_mdsc_get_request(req);
 892        insert_request(&mdsc->request_tree, req);
 893
 894        req->r_uid = current_fsuid();
 895        req->r_gid = current_fsgid();
 896
 897        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
 898                mdsc->oldest_tid = req->r_tid;
 899
 900        if (dir) {
 901                struct ceph_inode_info *ci = ceph_inode(dir);
 902
 903                ihold(dir);
 904                req->r_unsafe_dir = dir;
 905                spin_lock(&ci->i_unsafe_lock);
 906                list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
 907                spin_unlock(&ci->i_unsafe_lock);
 908        }
 909}
 910
 911static void __unregister_request(struct ceph_mds_client *mdsc,
 912                                 struct ceph_mds_request *req)
 913{
 914        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 915
 916        /* Never leave an unregistered request on an unsafe list! */
 917        list_del_init(&req->r_unsafe_item);
 918
 919        if (req->r_tid == mdsc->oldest_tid) {
 920                struct rb_node *p = rb_next(&req->r_node);
 921                mdsc->oldest_tid = 0;
 922                while (p) {
 923                        struct ceph_mds_request *next_req =
 924                                rb_entry(p, struct ceph_mds_request, r_node);
 925                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
 926                                mdsc->oldest_tid = next_req->r_tid;
 927                                break;
 928                        }
 929                        p = rb_next(p);
 930                }
 931        }
 932
 933        erase_request(&mdsc->request_tree, req);
 934
 935        if (req->r_unsafe_dir) {
 936                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 937                spin_lock(&ci->i_unsafe_lock);
 938                list_del_init(&req->r_unsafe_dir_item);
 939                spin_unlock(&ci->i_unsafe_lock);
 940        }
 941        if (req->r_target_inode &&
 942            test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 943                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 944                spin_lock(&ci->i_unsafe_lock);
 945                list_del_init(&req->r_unsafe_target_item);
 946                spin_unlock(&ci->i_unsafe_lock);
 947        }
 948
 949        if (req->r_unsafe_dir) {
 950                /* avoid calling iput_final() in mds dispatch threads */
 951                ceph_async_iput(req->r_unsafe_dir);
 952                req->r_unsafe_dir = NULL;
 953        }
 954
 955        complete_all(&req->r_safe_completion);
 956
 957        ceph_mdsc_put_request(req);
 958}
 959
 960/*
 961 * Walk back up the dentry tree until we hit a dentry representing a
 962 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
 963 * when calling this) to ensure that the objects won't disappear while we're
 964 * working with them. Once we hit a candidate dentry, we attempt to take a
 965 * reference to it, and return that as the result.
 966 */
 967static struct inode *get_nonsnap_parent(struct dentry *dentry)
 968{
 969        struct inode *inode = NULL;
 970
 971        while (dentry && !IS_ROOT(dentry)) {
 972                inode = d_inode_rcu(dentry);
 973                if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
 974                        break;
 975                dentry = dentry->d_parent;
 976        }
 977        if (inode)
 978                inode = igrab(inode);
 979        return inode;
 980}
 981
 982/*
 983 * Choose mds to send request to next.  If there is a hint set in the
 984 * request (e.g., due to a prior forward hint from the mds), use that.
 985 * Otherwise, consult frag tree and/or caps to identify the
 986 * appropriate mds.  If all else fails, choose randomly.
 987 *
 988 * Called under mdsc->mutex.
 989 */
 990static int __choose_mds(struct ceph_mds_client *mdsc,
 991                        struct ceph_mds_request *req,
 992                        bool *random)
 993{
 994        struct inode *inode;
 995        struct ceph_inode_info *ci;
 996        struct ceph_cap *cap;
 997        int mode = req->r_direct_mode;
 998        int mds = -1;
 999        u32 hash = req->r_direct_hash;
1000        bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1001
1002        if (random)
1003                *random = false;
1004
1005        /*
1006         * is there a specific mds we should try?  ignore hint if we have
1007         * no session and the mds is not up (active or recovering).
1008         */
1009        if (req->r_resend_mds >= 0 &&
1010            (__have_session(mdsc, req->r_resend_mds) ||
1011             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1012                dout("%s using resend_mds mds%d\n", __func__,
1013                     req->r_resend_mds);
1014                return req->r_resend_mds;
1015        }
1016
1017        if (mode == USE_RANDOM_MDS)
1018                goto random;
1019
1020        inode = NULL;
1021        if (req->r_inode) {
1022                if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1023                        inode = req->r_inode;
1024                        ihold(inode);
1025                } else {
1026                        /* req->r_dentry is non-null for LSSNAP request */
1027                        rcu_read_lock();
1028                        inode = get_nonsnap_parent(req->r_dentry);
1029                        rcu_read_unlock();
1030                        dout("%s using snapdir's parent %p\n", __func__, inode);
1031                }
1032        } else if (req->r_dentry) {
1033                /* ignore race with rename; old or new d_parent is okay */
1034                struct dentry *parent;
1035                struct inode *dir;
1036
1037                rcu_read_lock();
1038                parent = READ_ONCE(req->r_dentry->d_parent);
1039                dir = req->r_parent ? : d_inode_rcu(parent);
1040
1041                if (!dir || dir->i_sb != mdsc->fsc->sb) {
1042                        /*  not this fs or parent went negative */
1043                        inode = d_inode(req->r_dentry);
1044                        if (inode)
1045                                ihold(inode);
1046                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1047                        /* direct snapped/virtual snapdir requests
1048                         * based on parent dir inode */
1049                        inode = get_nonsnap_parent(parent);
1050                        dout("%s using nonsnap parent %p\n", __func__, inode);
1051                } else {
1052                        /* dentry target */
1053                        inode = d_inode(req->r_dentry);
1054                        if (!inode || mode == USE_AUTH_MDS) {
1055                                /* dir + name */
1056                                inode = igrab(dir);
1057                                hash = ceph_dentry_hash(dir, req->r_dentry);
1058                                is_hash = true;
1059                        } else {
1060                                ihold(inode);
1061                        }
1062                }
1063                rcu_read_unlock();
1064        }
1065
1066        dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1067             hash, mode);
1068        if (!inode)
1069                goto random;
1070        ci = ceph_inode(inode);
1071
1072        if (is_hash && S_ISDIR(inode->i_mode)) {
1073                struct ceph_inode_frag frag;
1074                int found;
1075
1076                ceph_choose_frag(ci, hash, &frag, &found);
1077                if (found) {
1078                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
1079                                u8 r;
1080
1081                                /* choose a random replica */
1082                                get_random_bytes(&r, 1);
1083                                r %= frag.ndist;
1084                                mds = frag.dist[r];
1085                                dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1086                                     __func__, inode, ceph_vinop(inode),
1087                                     frag.frag, mds, (int)r, frag.ndist);
1088                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1089                                    CEPH_MDS_STATE_ACTIVE &&
1090                                    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1091                                        goto out;
1092                        }
1093
1094                        /* since this file/dir wasn't known to be
1095                         * replicated, then we want to look for the
1096                         * authoritative mds. */
1097                        if (frag.mds >= 0) {
1098                                /* choose auth mds */
1099                                mds = frag.mds;
1100                                dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1101                                     __func__, inode, ceph_vinop(inode),
1102                                     frag.frag, mds);
1103                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1104                                    CEPH_MDS_STATE_ACTIVE) {
1105                                        if (mode == USE_ANY_MDS &&
1106                                            !ceph_mdsmap_is_laggy(mdsc->mdsmap,
1107                                                                  mds))
1108                                                goto out;
1109                                }
1110                        }
1111                        mode = USE_AUTH_MDS;
1112                }
1113        }
1114
1115        spin_lock(&ci->i_ceph_lock);
1116        cap = NULL;
1117        if (mode == USE_AUTH_MDS)
1118                cap = ci->i_auth_cap;
1119        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1120                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1121        if (!cap) {
1122                spin_unlock(&ci->i_ceph_lock);
1123                ceph_async_iput(inode);
1124                goto random;
1125        }
1126        mds = cap->session->s_mds;
1127        dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1128             inode, ceph_vinop(inode), mds,
1129             cap == ci->i_auth_cap ? "auth " : "", cap);
1130        spin_unlock(&ci->i_ceph_lock);
1131out:
1132        /* avoid calling iput_final() while holding mdsc->mutex or
1133         * in mds dispatch threads */
1134        ceph_async_iput(inode);
1135        return mds;
1136
1137random:
1138        if (random)
1139                *random = true;
1140
1141        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1142        dout("%s chose random mds%d\n", __func__, mds);
1143        return mds;
1144}
1145
1146
1147/*
1148 * session messages
1149 */
1150static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1151{
1152        struct ceph_msg *msg;
1153        struct ceph_mds_session_head *h;
1154
1155        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1156                           false);
1157        if (!msg) {
1158                pr_err("create_session_msg ENOMEM creating msg\n");
1159                return NULL;
1160        }
1161        h = msg->front.iov_base;
1162        h->op = cpu_to_le32(op);
1163        h->seq = cpu_to_le64(seq);
1164
1165        return msg;
1166}
1167
1168static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1169#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1170static void encode_supported_features(void **p, void *end)
1171{
1172        static const size_t count = ARRAY_SIZE(feature_bits);
1173
1174        if (count > 0) {
1175                size_t i;
1176                size_t size = FEATURE_BYTES(count);
1177
1178                BUG_ON(*p + 4 + size > end);
1179                ceph_encode_32(p, size);
1180                memset(*p, 0, size);
1181                for (i = 0; i < count; i++)
1182                        ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1183                *p += size;
1184        } else {
1185                BUG_ON(*p + 4 > end);
1186                ceph_encode_32(p, 0);
1187        }
1188}
1189
1190/*
1191 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1192 * to include additional client metadata fields.
1193 */
1194static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1195{
1196        struct ceph_msg *msg;
1197        struct ceph_mds_session_head *h;
1198        int i = -1;
1199        int extra_bytes = 0;
1200        int metadata_key_count = 0;
1201        struct ceph_options *opt = mdsc->fsc->client->options;
1202        struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1203        size_t size, count;
1204        void *p, *end;
1205
1206        const char* metadata[][2] = {
1207                {"hostname", mdsc->nodename},
1208                {"kernel_version", init_utsname()->release},
1209                {"entity_id", opt->name ? : ""},
1210                {"root", fsopt->server_path ? : "/"},
1211                {NULL, NULL}
1212        };
1213
1214        /* Calculate serialized length of metadata */
1215        extra_bytes = 4;  /* map length */
1216        for (i = 0; metadata[i][0]; ++i) {
1217                extra_bytes += 8 + strlen(metadata[i][0]) +
1218                        strlen(metadata[i][1]);
1219                metadata_key_count++;
1220        }
1221
1222        /* supported feature */
1223        size = 0;
1224        count = ARRAY_SIZE(feature_bits);
1225        if (count > 0)
1226                size = FEATURE_BYTES(count);
1227        extra_bytes += 4 + size;
1228
1229        /* Allocate the message */
1230        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1231                           GFP_NOFS, false);
1232        if (!msg) {
1233                pr_err("create_session_msg ENOMEM creating msg\n");
1234                return NULL;
1235        }
1236        p = msg->front.iov_base;
1237        end = p + msg->front.iov_len;
1238
1239        h = p;
1240        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1241        h->seq = cpu_to_le64(seq);
1242
1243        /*
1244         * Serialize client metadata into waiting buffer space, using
1245         * the format that userspace expects for map<string, string>
1246         *
1247         * ClientSession messages with metadata are v3
1248         */
1249        msg->hdr.version = cpu_to_le16(3);
1250        msg->hdr.compat_version = cpu_to_le16(1);
1251
1252        /* The write pointer, following the session_head structure */
1253        p += sizeof(*h);
1254
1255        /* Number of entries in the map */
1256        ceph_encode_32(&p, metadata_key_count);
1257
1258        /* Two length-prefixed strings for each entry in the map */
1259        for (i = 0; metadata[i][0]; ++i) {
1260                size_t const key_len = strlen(metadata[i][0]);
1261                size_t const val_len = strlen(metadata[i][1]);
1262
1263                ceph_encode_32(&p, key_len);
1264                memcpy(p, metadata[i][0], key_len);
1265                p += key_len;
1266                ceph_encode_32(&p, val_len);
1267                memcpy(p, metadata[i][1], val_len);
1268                p += val_len;
1269        }
1270
1271        encode_supported_features(&p, end);
1272        msg->front.iov_len = p - msg->front.iov_base;
1273        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1274
1275        return msg;
1276}
1277
1278/*
1279 * send session open request.
1280 *
1281 * called under mdsc->mutex
1282 */
1283static int __open_session(struct ceph_mds_client *mdsc,
1284                          struct ceph_mds_session *session)
1285{
1286        struct ceph_msg *msg;
1287        int mstate;
1288        int mds = session->s_mds;
1289
1290        /* wait for mds to go active? */
1291        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1292        dout("open_session to mds%d (%s)\n", mds,
1293             ceph_mds_state_name(mstate));
1294        session->s_state = CEPH_MDS_SESSION_OPENING;
1295        session->s_renew_requested = jiffies;
1296
1297        /* send connect message */
1298        msg = create_session_open_msg(mdsc, session->s_seq);
1299        if (!msg)
1300                return -ENOMEM;
1301        ceph_con_send(&session->s_con, msg);
1302        return 0;
1303}
1304
1305/*
1306 * open sessions for any export targets for the given mds
1307 *
1308 * called under mdsc->mutex
1309 */
1310static struct ceph_mds_session *
1311__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1312{
1313        struct ceph_mds_session *session;
1314
1315        session = __ceph_lookup_mds_session(mdsc, target);
1316        if (!session) {
1317                session = register_session(mdsc, target);
1318                if (IS_ERR(session))
1319                        return session;
1320        }
1321        if (session->s_state == CEPH_MDS_SESSION_NEW ||
1322            session->s_state == CEPH_MDS_SESSION_CLOSING)
1323                __open_session(mdsc, session);
1324
1325        return session;
1326}
1327
1328struct ceph_mds_session *
1329ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1330{
1331        struct ceph_mds_session *session;
1332
1333        dout("open_export_target_session to mds%d\n", target);
1334
1335        mutex_lock(&mdsc->mutex);
1336        session = __open_export_target_session(mdsc, target);
1337        mutex_unlock(&mdsc->mutex);
1338
1339        return session;
1340}
1341
1342static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1343                                          struct ceph_mds_session *session)
1344{
1345        struct ceph_mds_info *mi;
1346        struct ceph_mds_session *ts;
1347        int i, mds = session->s_mds;
1348
1349        if (mds >= mdsc->mdsmap->possible_max_rank)
1350                return;
1351
1352        mi = &mdsc->mdsmap->m_info[mds];
1353        dout("open_export_target_sessions for mds%d (%d targets)\n",
1354             session->s_mds, mi->num_export_targets);
1355
1356        for (i = 0; i < mi->num_export_targets; i++) {
1357                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1358                if (!IS_ERR(ts))
1359                        ceph_put_mds_session(ts);
1360        }
1361}
1362
1363void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1364                                           struct ceph_mds_session *session)
1365{
1366        mutex_lock(&mdsc->mutex);
1367        __open_export_target_sessions(mdsc, session);
1368        mutex_unlock(&mdsc->mutex);
1369}
1370
1371/*
1372 * session caps
1373 */
1374
1375static void detach_cap_releases(struct ceph_mds_session *session,
1376                                struct list_head *target)
1377{
1378        lockdep_assert_held(&session->s_cap_lock);
1379
1380        list_splice_init(&session->s_cap_releases, target);
1381        session->s_num_cap_releases = 0;
1382        dout("dispose_cap_releases mds%d\n", session->s_mds);
1383}
1384
1385static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1386                                 struct list_head *dispose)
1387{
1388        while (!list_empty(dispose)) {
1389                struct ceph_cap *cap;
1390                /* zero out the in-progress message */
1391                cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1392                list_del(&cap->session_caps);
1393                ceph_put_cap(mdsc, cap);
1394        }
1395}
1396
1397static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1398                                     struct ceph_mds_session *session)
1399{
1400        struct ceph_mds_request *req;
1401        struct rb_node *p;
1402        struct ceph_inode_info *ci;
1403
1404        dout("cleanup_session_requests mds%d\n", session->s_mds);
1405        mutex_lock(&mdsc->mutex);
1406        while (!list_empty(&session->s_unsafe)) {
1407                req = list_first_entry(&session->s_unsafe,
1408                                       struct ceph_mds_request, r_unsafe_item);
1409                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1410                                    req->r_tid);
1411                if (req->r_target_inode) {
1412                        /* dropping unsafe change of inode's attributes */
1413                        ci = ceph_inode(req->r_target_inode);
1414                        errseq_set(&ci->i_meta_err, -EIO);
1415                }
1416                if (req->r_unsafe_dir) {
1417                        /* dropping unsafe directory operation */
1418                        ci = ceph_inode(req->r_unsafe_dir);
1419                        errseq_set(&ci->i_meta_err, -EIO);
1420                }
1421                __unregister_request(mdsc, req);
1422        }
1423        /* zero r_attempts, so kick_requests() will re-send requests */
1424        p = rb_first(&mdsc->request_tree);
1425        while (p) {
1426                req = rb_entry(p, struct ceph_mds_request, r_node);
1427                p = rb_next(p);
1428                if (req->r_session &&
1429                    req->r_session->s_mds == session->s_mds)
1430                        req->r_attempts = 0;
1431        }
1432        mutex_unlock(&mdsc->mutex);
1433}
1434
1435/*
1436 * Helper to safely iterate over all caps associated with a session, with
1437 * special care taken to handle a racing __ceph_remove_cap().
1438 *
1439 * Caller must hold session s_mutex.
1440 */
1441int ceph_iterate_session_caps(struct ceph_mds_session *session,
1442                              int (*cb)(struct inode *, struct ceph_cap *,
1443                                        void *), void *arg)
1444{
1445        struct list_head *p;
1446        struct ceph_cap *cap;
1447        struct inode *inode, *last_inode = NULL;
1448        struct ceph_cap *old_cap = NULL;
1449        int ret;
1450
1451        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1452        spin_lock(&session->s_cap_lock);
1453        p = session->s_caps.next;
1454        while (p != &session->s_caps) {
1455                cap = list_entry(p, struct ceph_cap, session_caps);
1456                inode = igrab(&cap->ci->vfs_inode);
1457                if (!inode) {
1458                        p = p->next;
1459                        continue;
1460                }
1461                session->s_cap_iterator = cap;
1462                spin_unlock(&session->s_cap_lock);
1463
1464                if (last_inode) {
1465                        /* avoid calling iput_final() while holding
1466                         * s_mutex or in mds dispatch threads */
1467                        ceph_async_iput(last_inode);
1468                        last_inode = NULL;
1469                }
1470                if (old_cap) {
1471                        ceph_put_cap(session->s_mdsc, old_cap);
1472                        old_cap = NULL;
1473                }
1474
1475                ret = cb(inode, cap, arg);
1476                last_inode = inode;
1477
1478                spin_lock(&session->s_cap_lock);
1479                p = p->next;
1480                if (!cap->ci) {
1481                        dout("iterate_session_caps  finishing cap %p removal\n",
1482                             cap);
1483                        BUG_ON(cap->session != session);
1484                        cap->session = NULL;
1485                        list_del_init(&cap->session_caps);
1486                        session->s_nr_caps--;
1487                        if (cap->queue_release)
1488                                __ceph_queue_cap_release(session, cap);
1489                        else
1490                                old_cap = cap;  /* put_cap it w/o locks held */
1491                }
1492                if (ret < 0)
1493                        goto out;
1494        }
1495        ret = 0;
1496out:
1497        session->s_cap_iterator = NULL;
1498        spin_unlock(&session->s_cap_lock);
1499
1500        ceph_async_iput(last_inode);
1501        if (old_cap)
1502                ceph_put_cap(session->s_mdsc, old_cap);
1503
1504        return ret;
1505}
1506
1507static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1508                                  void *arg)
1509{
1510        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1511        struct ceph_inode_info *ci = ceph_inode(inode);
1512        LIST_HEAD(to_remove);
1513        bool dirty_dropped = false;
1514        bool invalidate = false;
1515
1516        dout("removing cap %p, ci is %p, inode is %p\n",
1517             cap, ci, &ci->vfs_inode);
1518        spin_lock(&ci->i_ceph_lock);
1519        __ceph_remove_cap(cap, false);
1520        if (!ci->i_auth_cap) {
1521                struct ceph_cap_flush *cf;
1522                struct ceph_mds_client *mdsc = fsc->mdsc;
1523
1524                if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1525                        if (inode->i_data.nrpages > 0)
1526                                invalidate = true;
1527                        if (ci->i_wrbuffer_ref > 0)
1528                                mapping_set_error(&inode->i_data, -EIO);
1529                }
1530
1531                while (!list_empty(&ci->i_cap_flush_list)) {
1532                        cf = list_first_entry(&ci->i_cap_flush_list,
1533                                              struct ceph_cap_flush, i_list);
1534                        list_move(&cf->i_list, &to_remove);
1535                }
1536
1537                spin_lock(&mdsc->cap_dirty_lock);
1538
1539                list_for_each_entry(cf, &to_remove, i_list)
1540                        list_del(&cf->g_list);
1541
1542                if (!list_empty(&ci->i_dirty_item)) {
1543                        pr_warn_ratelimited(
1544                                " dropping dirty %s state for %p %lld\n",
1545                                ceph_cap_string(ci->i_dirty_caps),
1546                                inode, ceph_ino(inode));
1547                        ci->i_dirty_caps = 0;
1548                        list_del_init(&ci->i_dirty_item);
1549                        dirty_dropped = true;
1550                }
1551                if (!list_empty(&ci->i_flushing_item)) {
1552                        pr_warn_ratelimited(
1553                                " dropping dirty+flushing %s state for %p %lld\n",
1554                                ceph_cap_string(ci->i_flushing_caps),
1555                                inode, ceph_ino(inode));
1556                        ci->i_flushing_caps = 0;
1557                        list_del_init(&ci->i_flushing_item);
1558                        mdsc->num_cap_flushing--;
1559                        dirty_dropped = true;
1560                }
1561                spin_unlock(&mdsc->cap_dirty_lock);
1562
1563                if (dirty_dropped) {
1564                        errseq_set(&ci->i_meta_err, -EIO);
1565
1566                        if (ci->i_wrbuffer_ref_head == 0 &&
1567                            ci->i_wr_ref == 0 &&
1568                            ci->i_dirty_caps == 0 &&
1569                            ci->i_flushing_caps == 0) {
1570                                ceph_put_snap_context(ci->i_head_snapc);
1571                                ci->i_head_snapc = NULL;
1572                        }
1573                }
1574
1575                if (atomic_read(&ci->i_filelock_ref) > 0) {
1576                        /* make further file lock syscall return -EIO */
1577                        ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1578                        pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1579                                            inode, ceph_ino(inode));
1580                }
1581
1582                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1583                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1584                        ci->i_prealloc_cap_flush = NULL;
1585                }
1586        }
1587        spin_unlock(&ci->i_ceph_lock);
1588        while (!list_empty(&to_remove)) {
1589                struct ceph_cap_flush *cf;
1590                cf = list_first_entry(&to_remove,
1591                                      struct ceph_cap_flush, i_list);
1592                list_del(&cf->i_list);
1593                ceph_free_cap_flush(cf);
1594        }
1595
1596        wake_up_all(&ci->i_cap_wq);
1597        if (invalidate)
1598                ceph_queue_invalidate(inode);
1599        if (dirty_dropped)
1600                iput(inode);
1601        return 0;
1602}
1603
1604/*
1605 * caller must hold session s_mutex
1606 */
1607static void remove_session_caps(struct ceph_mds_session *session)
1608{
1609        struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1610        struct super_block *sb = fsc->sb;
1611        LIST_HEAD(dispose);
1612
1613        dout("remove_session_caps on %p\n", session);
1614        ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1615
1616        wake_up_all(&fsc->mdsc->cap_flushing_wq);
1617
1618        spin_lock(&session->s_cap_lock);
1619        if (session->s_nr_caps > 0) {
1620                struct inode *inode;
1621                struct ceph_cap *cap, *prev = NULL;
1622                struct ceph_vino vino;
1623                /*
1624                 * iterate_session_caps() skips inodes that are being
1625                 * deleted, we need to wait until deletions are complete.
1626                 * __wait_on_freeing_inode() is designed for the job,
1627                 * but it is not exported, so use lookup inode function
1628                 * to access it.
1629                 */
1630                while (!list_empty(&session->s_caps)) {
1631                        cap = list_entry(session->s_caps.next,
1632                                         struct ceph_cap, session_caps);
1633                        if (cap == prev)
1634                                break;
1635                        prev = cap;
1636                        vino = cap->ci->i_vino;
1637                        spin_unlock(&session->s_cap_lock);
1638
1639                        inode = ceph_find_inode(sb, vino);
1640                         /* avoid calling iput_final() while holding s_mutex */
1641                        ceph_async_iput(inode);
1642
1643                        spin_lock(&session->s_cap_lock);
1644                }
1645        }
1646
1647        // drop cap expires and unlock s_cap_lock
1648        detach_cap_releases(session, &dispose);
1649
1650        BUG_ON(session->s_nr_caps > 0);
1651        BUG_ON(!list_empty(&session->s_cap_flushing));
1652        spin_unlock(&session->s_cap_lock);
1653        dispose_cap_releases(session->s_mdsc, &dispose);
1654}
1655
1656enum {
1657        RECONNECT,
1658        RENEWCAPS,
1659        FORCE_RO,
1660};
1661
1662/*
1663 * wake up any threads waiting on this session's caps.  if the cap is
1664 * old (didn't get renewed on the client reconnect), remove it now.
1665 *
1666 * caller must hold s_mutex.
1667 */
1668static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1669                              void *arg)
1670{
1671        struct ceph_inode_info *ci = ceph_inode(inode);
1672        unsigned long ev = (unsigned long)arg;
1673
1674        if (ev == RECONNECT) {
1675                spin_lock(&ci->i_ceph_lock);
1676                ci->i_wanted_max_size = 0;
1677                ci->i_requested_max_size = 0;
1678                spin_unlock(&ci->i_ceph_lock);
1679        } else if (ev == RENEWCAPS) {
1680                if (cap->cap_gen < cap->session->s_cap_gen) {
1681                        /* mds did not re-issue stale cap */
1682                        spin_lock(&ci->i_ceph_lock);
1683                        cap->issued = cap->implemented = CEPH_CAP_PIN;
1684                        spin_unlock(&ci->i_ceph_lock);
1685                }
1686        } else if (ev == FORCE_RO) {
1687        }
1688        wake_up_all(&ci->i_cap_wq);
1689        return 0;
1690}
1691
1692static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1693{
1694        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1695        ceph_iterate_session_caps(session, wake_up_session_cb,
1696                                  (void *)(unsigned long)ev);
1697}
1698
1699/*
1700 * Send periodic message to MDS renewing all currently held caps.  The
1701 * ack will reset the expiration for all caps from this session.
1702 *
1703 * caller holds s_mutex
1704 */
1705static int send_renew_caps(struct ceph_mds_client *mdsc,
1706                           struct ceph_mds_session *session)
1707{
1708        struct ceph_msg *msg;
1709        int state;
1710
1711        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1712            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1713                pr_info("mds%d caps stale\n", session->s_mds);
1714        session->s_renew_requested = jiffies;
1715
1716        /* do not try to renew caps until a recovering mds has reconnected
1717         * with its clients. */
1718        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1719        if (state < CEPH_MDS_STATE_RECONNECT) {
1720                dout("send_renew_caps ignoring mds%d (%s)\n",
1721                     session->s_mds, ceph_mds_state_name(state));
1722                return 0;
1723        }
1724
1725        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1726                ceph_mds_state_name(state));
1727        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1728                                 ++session->s_renew_seq);
1729        if (!msg)
1730                return -ENOMEM;
1731        ceph_con_send(&session->s_con, msg);
1732        return 0;
1733}
1734
1735static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1736                             struct ceph_mds_session *session, u64 seq)
1737{
1738        struct ceph_msg *msg;
1739
1740        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1741             session->s_mds, ceph_session_state_name(session->s_state), seq);
1742        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1743        if (!msg)
1744                return -ENOMEM;
1745        ceph_con_send(&session->s_con, msg);
1746        return 0;
1747}
1748
1749
1750/*
1751 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1752 *
1753 * Called under session->s_mutex
1754 */
1755static void renewed_caps(struct ceph_mds_client *mdsc,
1756                         struct ceph_mds_session *session, int is_renew)
1757{
1758        int was_stale;
1759        int wake = 0;
1760
1761        spin_lock(&session->s_cap_lock);
1762        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1763
1764        session->s_cap_ttl = session->s_renew_requested +
1765                mdsc->mdsmap->m_session_timeout*HZ;
1766
1767        if (was_stale) {
1768                if (time_before(jiffies, session->s_cap_ttl)) {
1769                        pr_info("mds%d caps renewed\n", session->s_mds);
1770                        wake = 1;
1771                } else {
1772                        pr_info("mds%d caps still stale\n", session->s_mds);
1773                }
1774        }
1775        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1776             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1777             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1778        spin_unlock(&session->s_cap_lock);
1779
1780        if (wake)
1781                wake_up_session_caps(session, RENEWCAPS);
1782}
1783
1784/*
1785 * send a session close request
1786 */
1787static int request_close_session(struct ceph_mds_client *mdsc,
1788                                 struct ceph_mds_session *session)
1789{
1790        struct ceph_msg *msg;
1791
1792        dout("request_close_session mds%d state %s seq %lld\n",
1793             session->s_mds, ceph_session_state_name(session->s_state),
1794             session->s_seq);
1795        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1796        if (!msg)
1797                return -ENOMEM;
1798        ceph_con_send(&session->s_con, msg);
1799        return 1;
1800}
1801
1802/*
1803 * Called with s_mutex held.
1804 */
1805static int __close_session(struct ceph_mds_client *mdsc,
1806                         struct ceph_mds_session *session)
1807{
1808        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1809                return 0;
1810        session->s_state = CEPH_MDS_SESSION_CLOSING;
1811        return request_close_session(mdsc, session);
1812}
1813
1814static bool drop_negative_children(struct dentry *dentry)
1815{
1816        struct dentry *child;
1817        bool all_negative = true;
1818
1819        if (!d_is_dir(dentry))
1820                goto out;
1821
1822        spin_lock(&dentry->d_lock);
1823        list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1824                if (d_really_is_positive(child)) {
1825                        all_negative = false;
1826                        break;
1827                }
1828        }
1829        spin_unlock(&dentry->d_lock);
1830
1831        if (all_negative)
1832                shrink_dcache_parent(dentry);
1833out:
1834        return all_negative;
1835}
1836
1837/*
1838 * Trim old(er) caps.
1839 *
1840 * Because we can't cache an inode without one or more caps, we do
1841 * this indirectly: if a cap is unused, we prune its aliases, at which
1842 * point the inode will hopefully get dropped to.
1843 *
1844 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1845 * memory pressure from the MDS, though, so it needn't be perfect.
1846 */
1847static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1848{
1849        int *remaining = arg;
1850        struct ceph_inode_info *ci = ceph_inode(inode);
1851        int used, wanted, oissued, mine;
1852
1853        if (*remaining <= 0)
1854                return -1;
1855
1856        spin_lock(&ci->i_ceph_lock);
1857        mine = cap->issued | cap->implemented;
1858        used = __ceph_caps_used(ci);
1859        wanted = __ceph_caps_file_wanted(ci);
1860        oissued = __ceph_caps_issued_other(ci, cap);
1861
1862        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1863             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1864             ceph_cap_string(used), ceph_cap_string(wanted));
1865        if (cap == ci->i_auth_cap) {
1866                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1867                    !list_empty(&ci->i_cap_snaps))
1868                        goto out;
1869                if ((used | wanted) & CEPH_CAP_ANY_WR)
1870                        goto out;
1871                /* Note: it's possible that i_filelock_ref becomes non-zero
1872                 * after dropping auth caps. It doesn't hurt because reply
1873                 * of lock mds request will re-add auth caps. */
1874                if (atomic_read(&ci->i_filelock_ref) > 0)
1875                        goto out;
1876        }
1877        /* The inode has cached pages, but it's no longer used.
1878         * we can safely drop it */
1879        if (S_ISREG(inode->i_mode) &&
1880            wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1881            !(oissued & CEPH_CAP_FILE_CACHE)) {
1882          used = 0;
1883          oissued = 0;
1884        }
1885        if ((used | wanted) & ~oissued & mine)
1886                goto out;   /* we need these caps */
1887
1888        if (oissued) {
1889                /* we aren't the only cap.. just remove us */
1890                __ceph_remove_cap(cap, true);
1891                (*remaining)--;
1892        } else {
1893                struct dentry *dentry;
1894                /* try dropping referring dentries */
1895                spin_unlock(&ci->i_ceph_lock);
1896                dentry = d_find_any_alias(inode);
1897                if (dentry && drop_negative_children(dentry)) {
1898                        int count;
1899                        dput(dentry);
1900                        d_prune_aliases(inode);
1901                        count = atomic_read(&inode->i_count);
1902                        if (count == 1)
1903                                (*remaining)--;
1904                        dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1905                             inode, cap, count);
1906                } else {
1907                        dput(dentry);
1908                }
1909                return 0;
1910        }
1911
1912out:
1913        spin_unlock(&ci->i_ceph_lock);
1914        return 0;
1915}
1916
1917/*
1918 * Trim session cap count down to some max number.
1919 */
1920int ceph_trim_caps(struct ceph_mds_client *mdsc,
1921                   struct ceph_mds_session *session,
1922                   int max_caps)
1923{
1924        int trim_caps = session->s_nr_caps - max_caps;
1925
1926        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1927             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1928        if (trim_caps > 0) {
1929                int remaining = trim_caps;
1930
1931                ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
1932                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1933                     session->s_mds, session->s_nr_caps, max_caps,
1934                        trim_caps - remaining);
1935        }
1936
1937        ceph_flush_cap_releases(mdsc, session);
1938        return 0;
1939}
1940
1941static int check_caps_flush(struct ceph_mds_client *mdsc,
1942                            u64 want_flush_tid)
1943{
1944        int ret = 1;
1945
1946        spin_lock(&mdsc->cap_dirty_lock);
1947        if (!list_empty(&mdsc->cap_flush_list)) {
1948                struct ceph_cap_flush *cf =
1949                        list_first_entry(&mdsc->cap_flush_list,
1950                                         struct ceph_cap_flush, g_list);
1951                if (cf->tid <= want_flush_tid) {
1952                        dout("check_caps_flush still flushing tid "
1953                             "%llu <= %llu\n", cf->tid, want_flush_tid);
1954                        ret = 0;
1955                }
1956        }
1957        spin_unlock(&mdsc->cap_dirty_lock);
1958        return ret;
1959}
1960
1961/*
1962 * flush all dirty inode data to disk.
1963 *
1964 * returns true if we've flushed through want_flush_tid
1965 */
1966static void wait_caps_flush(struct ceph_mds_client *mdsc,
1967                            u64 want_flush_tid)
1968{
1969        dout("check_caps_flush want %llu\n", want_flush_tid);
1970
1971        wait_event(mdsc->cap_flushing_wq,
1972                   check_caps_flush(mdsc, want_flush_tid));
1973
1974        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1975}
1976
1977/*
1978 * called under s_mutex
1979 */
1980static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1981                                   struct ceph_mds_session *session)
1982{
1983        struct ceph_msg *msg = NULL;
1984        struct ceph_mds_cap_release *head;
1985        struct ceph_mds_cap_item *item;
1986        struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1987        struct ceph_cap *cap;
1988        LIST_HEAD(tmp_list);
1989        int num_cap_releases;
1990        __le32  barrier, *cap_barrier;
1991
1992        down_read(&osdc->lock);
1993        barrier = cpu_to_le32(osdc->epoch_barrier);
1994        up_read(&osdc->lock);
1995
1996        spin_lock(&session->s_cap_lock);
1997again:
1998        list_splice_init(&session->s_cap_releases, &tmp_list);
1999        num_cap_releases = session->s_num_cap_releases;
2000        session->s_num_cap_releases = 0;
2001        spin_unlock(&session->s_cap_lock);
2002
2003        while (!list_empty(&tmp_list)) {
2004                if (!msg) {
2005                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2006                                        PAGE_SIZE, GFP_NOFS, false);
2007                        if (!msg)
2008                                goto out_err;
2009                        head = msg->front.iov_base;
2010                        head->num = cpu_to_le32(0);
2011                        msg->front.iov_len = sizeof(*head);
2012
2013                        msg->hdr.version = cpu_to_le16(2);
2014                        msg->hdr.compat_version = cpu_to_le16(1);
2015                }
2016
2017                cap = list_first_entry(&tmp_list, struct ceph_cap,
2018                                        session_caps);
2019                list_del(&cap->session_caps);
2020                num_cap_releases--;
2021
2022                head = msg->front.iov_base;
2023                put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2024                                   &head->num);
2025                item = msg->front.iov_base + msg->front.iov_len;
2026                item->ino = cpu_to_le64(cap->cap_ino);
2027                item->cap_id = cpu_to_le64(cap->cap_id);
2028                item->migrate_seq = cpu_to_le32(cap->mseq);
2029                item->seq = cpu_to_le32(cap->issue_seq);
2030                msg->front.iov_len += sizeof(*item);
2031
2032                ceph_put_cap(mdsc, cap);
2033
2034                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2035                        // Append cap_barrier field
2036                        cap_barrier = msg->front.iov_base + msg->front.iov_len;
2037                        *cap_barrier = barrier;
2038                        msg->front.iov_len += sizeof(*cap_barrier);
2039
2040                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2041                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2042                        ceph_con_send(&session->s_con, msg);
2043                        msg = NULL;
2044                }
2045        }
2046
2047        BUG_ON(num_cap_releases != 0);
2048
2049        spin_lock(&session->s_cap_lock);
2050        if (!list_empty(&session->s_cap_releases))
2051                goto again;
2052        spin_unlock(&session->s_cap_lock);
2053
2054        if (msg) {
2055                // Append cap_barrier field
2056                cap_barrier = msg->front.iov_base + msg->front.iov_len;
2057                *cap_barrier = barrier;
2058                msg->front.iov_len += sizeof(*cap_barrier);
2059
2060                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2061                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2062                ceph_con_send(&session->s_con, msg);
2063        }
2064        return;
2065out_err:
2066        pr_err("send_cap_releases mds%d, failed to allocate message\n",
2067                session->s_mds);
2068        spin_lock(&session->s_cap_lock);
2069        list_splice(&tmp_list, &session->s_cap_releases);
2070        session->s_num_cap_releases += num_cap_releases;
2071        spin_unlock(&session->s_cap_lock);
2072}
2073
2074static void ceph_cap_release_work(struct work_struct *work)
2075{
2076        struct ceph_mds_session *session =
2077                container_of(work, struct ceph_mds_session, s_cap_release_work);
2078
2079        mutex_lock(&session->s_mutex);
2080        if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2081            session->s_state == CEPH_MDS_SESSION_HUNG)
2082                ceph_send_cap_releases(session->s_mdsc, session);
2083        mutex_unlock(&session->s_mutex);
2084        ceph_put_mds_session(session);
2085}
2086
2087void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2088                             struct ceph_mds_session *session)
2089{
2090        if (mdsc->stopping)
2091                return;
2092
2093        ceph_get_mds_session(session);
2094        if (queue_work(mdsc->fsc->cap_wq,
2095                       &session->s_cap_release_work)) {
2096                dout("cap release work queued\n");
2097        } else {
2098                ceph_put_mds_session(session);
2099                dout("failed to queue cap release work\n");
2100        }
2101}
2102
2103/*
2104 * caller holds session->s_cap_lock
2105 */
2106void __ceph_queue_cap_release(struct ceph_mds_session *session,
2107                              struct ceph_cap *cap)
2108{
2109        list_add_tail(&cap->session_caps, &session->s_cap_releases);
2110        session->s_num_cap_releases++;
2111
2112        if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2113                ceph_flush_cap_releases(session->s_mdsc, session);
2114}
2115
2116static void ceph_cap_reclaim_work(struct work_struct *work)
2117{
2118        struct ceph_mds_client *mdsc =
2119                container_of(work, struct ceph_mds_client, cap_reclaim_work);
2120        int ret = ceph_trim_dentries(mdsc);
2121        if (ret == -EAGAIN)
2122                ceph_queue_cap_reclaim_work(mdsc);
2123}
2124
2125void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2126{
2127        if (mdsc->stopping)
2128                return;
2129
2130        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2131                dout("caps reclaim work queued\n");
2132        } else {
2133                dout("failed to queue caps release work\n");
2134        }
2135}
2136
2137void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2138{
2139        int val;
2140        if (!nr)
2141                return;
2142        val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2143        if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2144                atomic_set(&mdsc->cap_reclaim_pending, 0);
2145                ceph_queue_cap_reclaim_work(mdsc);
2146        }
2147}
2148
2149/*
2150 * requests
2151 */
2152
2153int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2154                                    struct inode *dir)
2155{
2156        struct ceph_inode_info *ci = ceph_inode(dir);
2157        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2158        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2159        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2160        int order, num_entries;
2161
2162        spin_lock(&ci->i_ceph_lock);
2163        num_entries = ci->i_files + ci->i_subdirs;
2164        spin_unlock(&ci->i_ceph_lock);
2165        num_entries = max(num_entries, 1);
2166        num_entries = min(num_entries, opt->max_readdir);
2167
2168        order = get_order(size * num_entries);
2169        while (order >= 0) {
2170                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2171                                                             __GFP_NOWARN,
2172                                                             order);
2173                if (rinfo->dir_entries)
2174                        break;
2175                order--;
2176        }
2177        if (!rinfo->dir_entries)
2178                return -ENOMEM;
2179
2180        num_entries = (PAGE_SIZE << order) / size;
2181        num_entries = min(num_entries, opt->max_readdir);
2182
2183        rinfo->dir_buf_size = PAGE_SIZE << order;
2184        req->r_num_caps = num_entries + 1;
2185        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2186        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2187        return 0;
2188}
2189
2190/*
2191 * Create an mds request.
2192 */
2193struct ceph_mds_request *
2194ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2195{
2196        struct ceph_mds_request *req;
2197
2198        req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2199        if (!req)
2200                return ERR_PTR(-ENOMEM);
2201
2202        mutex_init(&req->r_fill_mutex);
2203        req->r_mdsc = mdsc;
2204        req->r_started = jiffies;
2205        req->r_resend_mds = -1;
2206        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2207        INIT_LIST_HEAD(&req->r_unsafe_target_item);
2208        req->r_fmode = -1;
2209        kref_init(&req->r_kref);
2210        RB_CLEAR_NODE(&req->r_node);
2211        INIT_LIST_HEAD(&req->r_wait);
2212        init_completion(&req->r_completion);
2213        init_completion(&req->r_safe_completion);
2214        INIT_LIST_HEAD(&req->r_unsafe_item);
2215
2216        ktime_get_coarse_real_ts64(&req->r_stamp);
2217
2218        req->r_op = op;
2219        req->r_direct_mode = mode;
2220        return req;
2221}
2222
2223/*
2224 * return oldest (lowest) request, tid in request tree, 0 if none.
2225 *
2226 * called under mdsc->mutex.
2227 */
2228static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2229{
2230        if (RB_EMPTY_ROOT(&mdsc->request_tree))
2231                return NULL;
2232        return rb_entry(rb_first(&mdsc->request_tree),
2233                        struct ceph_mds_request, r_node);
2234}
2235
2236static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2237{
2238        return mdsc->oldest_tid;
2239}
2240
2241/*
2242 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2243 * on build_path_from_dentry in fs/cifs/dir.c.
2244 *
2245 * If @stop_on_nosnap, generate path relative to the first non-snapped
2246 * inode.
2247 *
2248 * Encode hidden .snap dirs as a double /, i.e.
2249 *   foo/.snap/bar -> foo//bar
2250 */
2251char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2252                           int stop_on_nosnap)
2253{
2254        struct dentry *temp;
2255        char *path;
2256        int pos;
2257        unsigned seq;
2258        u64 base;
2259
2260        if (!dentry)
2261                return ERR_PTR(-EINVAL);
2262
2263        path = __getname();
2264        if (!path)
2265                return ERR_PTR(-ENOMEM);
2266retry:
2267        pos = PATH_MAX - 1;
2268        path[pos] = '\0';
2269
2270        seq = read_seqbegin(&rename_lock);
2271        rcu_read_lock();
2272        temp = dentry;
2273        for (;;) {
2274                struct inode *inode;
2275
2276                spin_lock(&temp->d_lock);
2277                inode = d_inode(temp);
2278                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2279                        dout("build_path path+%d: %p SNAPDIR\n",
2280                             pos, temp);
2281                } else if (stop_on_nosnap && inode && dentry != temp &&
2282                           ceph_snap(inode) == CEPH_NOSNAP) {
2283                        spin_unlock(&temp->d_lock);
2284                        pos++; /* get rid of any prepended '/' */
2285                        break;
2286                } else {
2287                        pos -= temp->d_name.len;
2288                        if (pos < 0) {
2289                                spin_unlock(&temp->d_lock);
2290                                break;
2291                        }
2292                        memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2293                }
2294                spin_unlock(&temp->d_lock);
2295                temp = READ_ONCE(temp->d_parent);
2296
2297                /* Are we at the root? */
2298                if (IS_ROOT(temp))
2299                        break;
2300
2301                /* Are we out of buffer? */
2302                if (--pos < 0)
2303                        break;
2304
2305                path[pos] = '/';
2306        }
2307        base = ceph_ino(d_inode(temp));
2308        rcu_read_unlock();
2309
2310        if (read_seqretry(&rename_lock, seq))
2311                goto retry;
2312
2313        if (pos < 0) {
2314                /*
2315                 * A rename didn't occur, but somehow we didn't end up where
2316                 * we thought we would. Throw a warning and try again.
2317                 */
2318                pr_warn("build_path did not end path lookup where "
2319                        "expected, pos is %d\n", pos);
2320                goto retry;
2321        }
2322
2323        *pbase = base;
2324        *plen = PATH_MAX - 1 - pos;
2325        dout("build_path on %p %d built %llx '%.*s'\n",
2326             dentry, d_count(dentry), base, *plen, path + pos);
2327        return path + pos;
2328}
2329
2330static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2331                             const char **ppath, int *ppathlen, u64 *pino,
2332                             bool *pfreepath, bool parent_locked)
2333{
2334        char *path;
2335
2336        rcu_read_lock();
2337        if (!dir)
2338                dir = d_inode_rcu(dentry->d_parent);
2339        if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2340                *pino = ceph_ino(dir);
2341                rcu_read_unlock();
2342                *ppath = dentry->d_name.name;
2343                *ppathlen = dentry->d_name.len;
2344                return 0;
2345        }
2346        rcu_read_unlock();
2347        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2348        if (IS_ERR(path))
2349                return PTR_ERR(path);
2350        *ppath = path;
2351        *pfreepath = true;
2352        return 0;
2353}
2354
2355static int build_inode_path(struct inode *inode,
2356                            const char **ppath, int *ppathlen, u64 *pino,
2357                            bool *pfreepath)
2358{
2359        struct dentry *dentry;
2360        char *path;
2361
2362        if (ceph_snap(inode) == CEPH_NOSNAP) {
2363                *pino = ceph_ino(inode);
2364                *ppathlen = 0;
2365                return 0;
2366        }
2367        dentry = d_find_alias(inode);
2368        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2369        dput(dentry);
2370        if (IS_ERR(path))
2371                return PTR_ERR(path);
2372        *ppath = path;
2373        *pfreepath = true;
2374        return 0;
2375}
2376
2377/*
2378 * request arguments may be specified via an inode *, a dentry *, or
2379 * an explicit ino+path.
2380 */
2381static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2382                                  struct inode *rdiri, const char *rpath,
2383                                  u64 rino, const char **ppath, int *pathlen,
2384                                  u64 *ino, bool *freepath, bool parent_locked)
2385{
2386        int r = 0;
2387
2388        if (rinode) {
2389                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2390                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2391                     ceph_snap(rinode));
2392        } else if (rdentry) {
2393                r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2394                                        freepath, parent_locked);
2395                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2396                     *ppath);
2397        } else if (rpath || rino) {
2398                *ino = rino;
2399                *ppath = rpath;
2400                *pathlen = rpath ? strlen(rpath) : 0;
2401                dout(" path %.*s\n", *pathlen, rpath);
2402        }
2403
2404        return r;
2405}
2406
2407/*
2408 * called under mdsc->mutex
2409 */
2410static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2411                                               struct ceph_mds_request *req,
2412                                               int mds, bool drop_cap_releases)
2413{
2414        struct ceph_msg *msg;
2415        struct ceph_mds_request_head *head;
2416        const char *path1 = NULL;
2417        const char *path2 = NULL;
2418        u64 ino1 = 0, ino2 = 0;
2419        int pathlen1 = 0, pathlen2 = 0;
2420        bool freepath1 = false, freepath2 = false;
2421        int len;
2422        u16 releases;
2423        void *p, *end;
2424        int ret;
2425
2426        ret = set_request_path_attr(req->r_inode, req->r_dentry,
2427                              req->r_parent, req->r_path1, req->r_ino1.ino,
2428                              &path1, &pathlen1, &ino1, &freepath1,
2429                              test_bit(CEPH_MDS_R_PARENT_LOCKED,
2430                                        &req->r_req_flags));
2431        if (ret < 0) {
2432                msg = ERR_PTR(ret);
2433                goto out;
2434        }
2435
2436        /* If r_old_dentry is set, then assume that its parent is locked */
2437        ret = set_request_path_attr(NULL, req->r_old_dentry,
2438                              req->r_old_dentry_dir,
2439                              req->r_path2, req->r_ino2.ino,
2440                              &path2, &pathlen2, &ino2, &freepath2, true);
2441        if (ret < 0) {
2442                msg = ERR_PTR(ret);
2443                goto out_free1;
2444        }
2445
2446        len = sizeof(*head) +
2447                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2448                sizeof(struct ceph_timespec);
2449
2450        /* calculate (max) length for cap releases */
2451        len += sizeof(struct ceph_mds_request_release) *
2452                (!!req->r_inode_drop + !!req->r_dentry_drop +
2453                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2454        if (req->r_dentry_drop)
2455                len += pathlen1;
2456        if (req->r_old_dentry_drop)
2457                len += pathlen2;
2458
2459        msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2460        if (!msg) {
2461                msg = ERR_PTR(-ENOMEM);
2462                goto out_free2;
2463        }
2464
2465        msg->hdr.version = cpu_to_le16(2);
2466        msg->hdr.tid = cpu_to_le64(req->r_tid);
2467
2468        head = msg->front.iov_base;
2469        p = msg->front.iov_base + sizeof(*head);
2470        end = msg->front.iov_base + msg->front.iov_len;
2471
2472        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2473        head->op = cpu_to_le32(req->r_op);
2474        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2475        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2476        head->ino = cpu_to_le64(req->r_deleg_ino);
2477        head->args = req->r_args;
2478
2479        ceph_encode_filepath(&p, end, ino1, path1);
2480        ceph_encode_filepath(&p, end, ino2, path2);
2481
2482        /* make note of release offset, in case we need to replay */
2483        req->r_request_release_offset = p - msg->front.iov_base;
2484
2485        /* cap releases */
2486        releases = 0;
2487        if (req->r_inode_drop)
2488                releases += ceph_encode_inode_release(&p,
2489                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2490                      mds, req->r_inode_drop, req->r_inode_unless,
2491                      req->r_op == CEPH_MDS_OP_READDIR);
2492        if (req->r_dentry_drop)
2493                releases += ceph_encode_dentry_release(&p, req->r_dentry,
2494                                req->r_parent, mds, req->r_dentry_drop,
2495                                req->r_dentry_unless);
2496        if (req->r_old_dentry_drop)
2497                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2498                                req->r_old_dentry_dir, mds,
2499                                req->r_old_dentry_drop,
2500                                req->r_old_dentry_unless);
2501        if (req->r_old_inode_drop)
2502                releases += ceph_encode_inode_release(&p,
2503                      d_inode(req->r_old_dentry),
2504                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2505
2506        if (drop_cap_releases) {
2507                releases = 0;
2508                p = msg->front.iov_base + req->r_request_release_offset;
2509        }
2510
2511        head->num_releases = cpu_to_le16(releases);
2512
2513        /* time stamp */
2514        {
2515                struct ceph_timespec ts;
2516                ceph_encode_timespec64(&ts, &req->r_stamp);
2517                ceph_encode_copy(&p, &ts, sizeof(ts));
2518        }
2519
2520        BUG_ON(p > end);
2521        msg->front.iov_len = p - msg->front.iov_base;
2522        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2523
2524        if (req->r_pagelist) {
2525                struct ceph_pagelist *pagelist = req->r_pagelist;
2526                ceph_msg_data_add_pagelist(msg, pagelist);
2527                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2528        } else {
2529                msg->hdr.data_len = 0;
2530        }
2531
2532        msg->hdr.data_off = cpu_to_le16(0);
2533
2534out_free2:
2535        if (freepath2)
2536                ceph_mdsc_free_path((char *)path2, pathlen2);
2537out_free1:
2538        if (freepath1)
2539                ceph_mdsc_free_path((char *)path1, pathlen1);
2540out:
2541        return msg;
2542}
2543
2544/*
2545 * called under mdsc->mutex if error, under no mutex if
2546 * success.
2547 */
2548static void complete_request(struct ceph_mds_client *mdsc,
2549                             struct ceph_mds_request *req)
2550{
2551        if (req->r_callback)
2552                req->r_callback(mdsc, req);
2553        complete_all(&req->r_completion);
2554}
2555
2556/*
2557 * called under mdsc->mutex
2558 */
2559static int __prepare_send_request(struct ceph_mds_client *mdsc,
2560                                  struct ceph_mds_request *req,
2561                                  int mds, bool drop_cap_releases)
2562{
2563        struct ceph_mds_request_head *rhead;
2564        struct ceph_msg *msg;
2565        int flags = 0;
2566
2567        req->r_attempts++;
2568        if (req->r_inode) {
2569                struct ceph_cap *cap =
2570                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2571
2572                if (cap)
2573                        req->r_sent_on_mseq = cap->mseq;
2574                else
2575                        req->r_sent_on_mseq = -1;
2576        }
2577        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2578             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2579
2580        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2581                void *p;
2582                /*
2583                 * Replay.  Do not regenerate message (and rebuild
2584                 * paths, etc.); just use the original message.
2585                 * Rebuilding paths will break for renames because
2586                 * d_move mangles the src name.
2587                 */
2588                msg = req->r_request;
2589                rhead = msg->front.iov_base;
2590
2591                flags = le32_to_cpu(rhead->flags);
2592                flags |= CEPH_MDS_FLAG_REPLAY;
2593                rhead->flags = cpu_to_le32(flags);
2594
2595                if (req->r_target_inode)
2596                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2597
2598                rhead->num_retry = req->r_attempts - 1;
2599
2600                /* remove cap/dentry releases from message */
2601                rhead->num_releases = 0;
2602
2603                /* time stamp */
2604                p = msg->front.iov_base + req->r_request_release_offset;
2605                {
2606                        struct ceph_timespec ts;
2607                        ceph_encode_timespec64(&ts, &req->r_stamp);
2608                        ceph_encode_copy(&p, &ts, sizeof(ts));
2609                }
2610
2611                msg->front.iov_len = p - msg->front.iov_base;
2612                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2613                return 0;
2614        }
2615
2616        if (req->r_request) {
2617                ceph_msg_put(req->r_request);
2618                req->r_request = NULL;
2619        }
2620        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2621        if (IS_ERR(msg)) {
2622                req->r_err = PTR_ERR(msg);
2623                return PTR_ERR(msg);
2624        }
2625        req->r_request = msg;
2626
2627        rhead = msg->front.iov_base;
2628        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2629        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2630                flags |= CEPH_MDS_FLAG_REPLAY;
2631        if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2632                flags |= CEPH_MDS_FLAG_ASYNC;
2633        if (req->r_parent)
2634                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2635        rhead->flags = cpu_to_le32(flags);
2636        rhead->num_fwd = req->r_num_fwd;
2637        rhead->num_retry = req->r_attempts - 1;
2638
2639        dout(" r_parent = %p\n", req->r_parent);
2640        return 0;
2641}
2642
2643/*
2644 * called under mdsc->mutex
2645 */
2646static int __send_request(struct ceph_mds_client *mdsc,
2647                          struct ceph_mds_session *session,
2648                          struct ceph_mds_request *req,
2649                          bool drop_cap_releases)
2650{
2651        int err;
2652
2653        err = __prepare_send_request(mdsc, req, session->s_mds,
2654                                     drop_cap_releases);
2655        if (!err) {
2656                ceph_msg_get(req->r_request);
2657                ceph_con_send(&session->s_con, req->r_request);
2658        }
2659
2660        return err;
2661}
2662
2663/*
2664 * send request, or put it on the appropriate wait list.
2665 */
2666static void __do_request(struct ceph_mds_client *mdsc,
2667                        struct ceph_mds_request *req)
2668{
2669        struct ceph_mds_session *session = NULL;
2670        int mds = -1;
2671        int err = 0;
2672        bool random;
2673
2674        if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2675                if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2676                        __unregister_request(mdsc, req);
2677                return;
2678        }
2679
2680        if (req->r_timeout &&
2681            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2682                dout("do_request timed out\n");
2683                err = -ETIMEDOUT;
2684                goto finish;
2685        }
2686        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2687                dout("do_request forced umount\n");
2688                err = -EIO;
2689                goto finish;
2690        }
2691        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2692                if (mdsc->mdsmap_err) {
2693                        err = mdsc->mdsmap_err;
2694                        dout("do_request mdsmap err %d\n", err);
2695                        goto finish;
2696                }
2697                if (mdsc->mdsmap->m_epoch == 0) {
2698                        dout("do_request no mdsmap, waiting for map\n");
2699                        list_add(&req->r_wait, &mdsc->waiting_for_map);
2700                        return;
2701                }
2702                if (!(mdsc->fsc->mount_options->flags &
2703                      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2704                    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2705                        err = -EHOSTUNREACH;
2706                        goto finish;
2707                }
2708        }
2709
2710        put_request_session(req);
2711
2712        mds = __choose_mds(mdsc, req, &random);
2713        if (mds < 0 ||
2714            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2715                if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2716                        err = -EJUKEBOX;
2717                        goto finish;
2718                }
2719                dout("do_request no mds or not active, waiting for map\n");
2720                list_add(&req->r_wait, &mdsc->waiting_for_map);
2721                return;
2722        }
2723
2724        /* get, open session */
2725        session = __ceph_lookup_mds_session(mdsc, mds);
2726        if (!session) {
2727                session = register_session(mdsc, mds);
2728                if (IS_ERR(session)) {
2729                        err = PTR_ERR(session);
2730                        goto finish;
2731                }
2732        }
2733        req->r_session = ceph_get_mds_session(session);
2734
2735        dout("do_request mds%d session %p state %s\n", mds, session,
2736             ceph_session_state_name(session->s_state));
2737        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2738            session->s_state != CEPH_MDS_SESSION_HUNG) {
2739                if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2740                        err = -EACCES;
2741                        goto out_session;
2742                }
2743                /*
2744                 * We cannot queue async requests since the caps and delegated
2745                 * inodes are bound to the session. Just return -EJUKEBOX and
2746                 * let the caller retry a sync request in that case.
2747                 */
2748                if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2749                        err = -EJUKEBOX;
2750                        goto out_session;
2751                }
2752                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2753                    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2754                        __open_session(mdsc, session);
2755                        /* retry the same mds later */
2756                        if (random)
2757                                req->r_resend_mds = mds;
2758                }
2759                list_add(&req->r_wait, &session->s_waiting);
2760                goto out_session;
2761        }
2762
2763        /* send request */
2764        req->r_resend_mds = -1;   /* forget any previous mds hint */
2765
2766        if (req->r_request_started == 0)   /* note request start time */
2767                req->r_request_started = jiffies;
2768
2769        err = __send_request(mdsc, session, req, false);
2770
2771out_session:
2772        ceph_put_mds_session(session);
2773finish:
2774        if (err) {
2775                dout("__do_request early error %d\n", err);
2776                req->r_err = err;
2777                complete_request(mdsc, req);
2778                __unregister_request(mdsc, req);
2779        }
2780        return;
2781}
2782
2783/*
2784 * called under mdsc->mutex
2785 */
2786static void __wake_requests(struct ceph_mds_client *mdsc,
2787                            struct list_head *head)
2788{
2789        struct ceph_mds_request *req;
2790        LIST_HEAD(tmp_list);
2791
2792        list_splice_init(head, &tmp_list);
2793
2794        while (!list_empty(&tmp_list)) {
2795                req = list_entry(tmp_list.next,
2796                                 struct ceph_mds_request, r_wait);
2797                list_del_init(&req->r_wait);
2798                dout(" wake request %p tid %llu\n", req, req->r_tid);
2799                __do_request(mdsc, req);
2800        }
2801}
2802
2803/*
2804 * Wake up threads with requests pending for @mds, so that they can
2805 * resubmit their requests to a possibly different mds.
2806 */
2807static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2808{
2809        struct ceph_mds_request *req;
2810        struct rb_node *p = rb_first(&mdsc->request_tree);
2811
2812        dout("kick_requests mds%d\n", mds);
2813        while (p) {
2814                req = rb_entry(p, struct ceph_mds_request, r_node);
2815                p = rb_next(p);
2816                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2817                        continue;
2818                if (req->r_attempts > 0)
2819                        continue; /* only new requests */
2820                if (req->r_session &&
2821                    req->r_session->s_mds == mds) {
2822                        dout(" kicking tid %llu\n", req->r_tid);
2823                        list_del_init(&req->r_wait);
2824                        __do_request(mdsc, req);
2825                }
2826        }
2827}
2828
2829int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2830                              struct ceph_mds_request *req)
2831{
2832        int err = 0;
2833
2834        /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2835        if (req->r_inode)
2836                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2837        if (req->r_parent) {
2838                struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2839                int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2840                            CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2841                spin_lock(&ci->i_ceph_lock);
2842                ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2843                __ceph_touch_fmode(ci, mdsc, fmode);
2844                spin_unlock(&ci->i_ceph_lock);
2845                ihold(req->r_parent);
2846        }
2847        if (req->r_old_dentry_dir)
2848                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2849                                  CEPH_CAP_PIN);
2850
2851        if (req->r_inode) {
2852                err = ceph_wait_on_async_create(req->r_inode);
2853                if (err) {
2854                        dout("%s: wait for async create returned: %d\n",
2855                             __func__, err);
2856                        return err;
2857                }
2858        }
2859
2860        if (!err && req->r_old_inode) {
2861                err = ceph_wait_on_async_create(req->r_old_inode);
2862                if (err) {
2863                        dout("%s: wait for async create returned: %d\n",
2864                             __func__, err);
2865                        return err;
2866                }
2867        }
2868
2869        dout("submit_request on %p for inode %p\n", req, dir);
2870        mutex_lock(&mdsc->mutex);
2871        __register_request(mdsc, req, dir);
2872        __do_request(mdsc, req);
2873        err = req->r_err;
2874        mutex_unlock(&mdsc->mutex);
2875        return err;
2876}
2877
2878static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2879                                  struct ceph_mds_request *req)
2880{
2881        int err;
2882
2883        /* wait */
2884        dout("do_request waiting\n");
2885        if (!req->r_timeout && req->r_wait_for_completion) {
2886                err = req->r_wait_for_completion(mdsc, req);
2887        } else {
2888                long timeleft = wait_for_completion_killable_timeout(
2889                                        &req->r_completion,
2890                                        ceph_timeout_jiffies(req->r_timeout));
2891                if (timeleft > 0)
2892                        err = 0;
2893                else if (!timeleft)
2894                        err = -ETIMEDOUT;  /* timed out */
2895                else
2896                        err = timeleft;  /* killed */
2897        }
2898        dout("do_request waited, got %d\n", err);
2899        mutex_lock(&mdsc->mutex);
2900
2901        /* only abort if we didn't race with a real reply */
2902        if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2903                err = le32_to_cpu(req->r_reply_info.head->result);
2904        } else if (err < 0) {
2905                dout("aborted request %lld with %d\n", req->r_tid, err);
2906
2907                /*
2908                 * ensure we aren't running concurrently with
2909                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2910                 * rely on locks (dir mutex) held by our caller.
2911                 */
2912                mutex_lock(&req->r_fill_mutex);
2913                req->r_err = err;
2914                set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2915                mutex_unlock(&req->r_fill_mutex);
2916
2917                if (req->r_parent &&
2918                    (req->r_op & CEPH_MDS_OP_WRITE))
2919                        ceph_invalidate_dir_request(req);
2920        } else {
2921                err = req->r_err;
2922        }
2923
2924        mutex_unlock(&mdsc->mutex);
2925        return err;
2926}
2927
2928/*
2929 * Synchrously perform an mds request.  Take care of all of the
2930 * session setup, forwarding, retry details.
2931 */
2932int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2933                         struct inode *dir,
2934                         struct ceph_mds_request *req)
2935{
2936        int err;
2937
2938        dout("do_request on %p\n", req);
2939
2940        /* issue */
2941        err = ceph_mdsc_submit_request(mdsc, dir, req);
2942        if (!err)
2943                err = ceph_mdsc_wait_request(mdsc, req);
2944        dout("do_request %p done, result %d\n", req, err);
2945        return err;
2946}
2947
2948/*
2949 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2950 * namespace request.
2951 */
2952void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2953{
2954        struct inode *dir = req->r_parent;
2955        struct inode *old_dir = req->r_old_dentry_dir;
2956
2957        dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2958
2959        ceph_dir_clear_complete(dir);
2960        if (old_dir)
2961                ceph_dir_clear_complete(old_dir);
2962        if (req->r_dentry)
2963                ceph_invalidate_dentry_lease(req->r_dentry);
2964        if (req->r_old_dentry)
2965                ceph_invalidate_dentry_lease(req->r_old_dentry);
2966}
2967
2968/*
2969 * Handle mds reply.
2970 *
2971 * We take the session mutex and parse and process the reply immediately.
2972 * This preserves the logical ordering of replies, capabilities, etc., sent
2973 * by the MDS as they are applied to our local cache.
2974 */
2975static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2976{
2977        struct ceph_mds_client *mdsc = session->s_mdsc;
2978        struct ceph_mds_request *req;
2979        struct ceph_mds_reply_head *head = msg->front.iov_base;
2980        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2981        struct ceph_snap_realm *realm;
2982        u64 tid;
2983        int err, result;
2984        int mds = session->s_mds;
2985
2986        if (msg->front.iov_len < sizeof(*head)) {
2987                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2988                ceph_msg_dump(msg);
2989                return;
2990        }
2991
2992        /* get request, session */
2993        tid = le64_to_cpu(msg->hdr.tid);
2994        mutex_lock(&mdsc->mutex);
2995        req = lookup_get_request(mdsc, tid);
2996        if (!req) {
2997                dout("handle_reply on unknown tid %llu\n", tid);
2998                mutex_unlock(&mdsc->mutex);
2999                return;
3000        }
3001        dout("handle_reply %p\n", req);
3002
3003        /* correct session? */
3004        if (req->r_session != session) {
3005                pr_err("mdsc_handle_reply got %llu on session mds%d"
3006                       " not mds%d\n", tid, session->s_mds,
3007                       req->r_session ? req->r_session->s_mds : -1);
3008                mutex_unlock(&mdsc->mutex);
3009                goto out;
3010        }
3011
3012        /* dup? */
3013        if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3014            (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3015                pr_warn("got a dup %s reply on %llu from mds%d\n",
3016                           head->safe ? "safe" : "unsafe", tid, mds);
3017                mutex_unlock(&mdsc->mutex);
3018                goto out;
3019        }
3020        if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3021                pr_warn("got unsafe after safe on %llu from mds%d\n",
3022                           tid, mds);
3023                mutex_unlock(&mdsc->mutex);
3024                goto out;
3025        }
3026
3027        result = le32_to_cpu(head->result);
3028
3029        /*
3030         * Handle an ESTALE
3031         * if we're not talking to the authority, send to them
3032         * if the authority has changed while we weren't looking,
3033         * send to new authority
3034         * Otherwise we just have to return an ESTALE
3035         */
3036        if (result == -ESTALE) {
3037                dout("got ESTALE on request %llu\n", req->r_tid);
3038                req->r_resend_mds = -1;
3039                if (req->r_direct_mode != USE_AUTH_MDS) {
3040                        dout("not using auth, setting for that now\n");
3041                        req->r_direct_mode = USE_AUTH_MDS;
3042                        __do_request(mdsc, req);
3043                        mutex_unlock(&mdsc->mutex);
3044                        goto out;
3045                } else  {
3046                        int mds = __choose_mds(mdsc, req, NULL);
3047                        if (mds >= 0 && mds != req->r_session->s_mds) {
3048                                dout("but auth changed, so resending\n");
3049                                __do_request(mdsc, req);
3050                                mutex_unlock(&mdsc->mutex);
3051                                goto out;
3052                        }
3053                }
3054                dout("have to return ESTALE on request %llu\n", req->r_tid);
3055        }
3056
3057
3058        if (head->safe) {
3059                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3060                __unregister_request(mdsc, req);
3061
3062                /* last request during umount? */
3063                if (mdsc->stopping && !__get_oldest_req(mdsc))
3064                        complete_all(&mdsc->safe_umount_waiters);
3065
3066                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3067                        /*
3068                         * We already handled the unsafe response, now do the
3069                         * cleanup.  No need to examine the response; the MDS
3070                         * doesn't include any result info in the safe
3071                         * response.  And even if it did, there is nothing
3072                         * useful we could do with a revised return value.
3073                         */
3074                        dout("got safe reply %llu, mds%d\n", tid, mds);
3075
3076                        mutex_unlock(&mdsc->mutex);
3077                        goto out;
3078                }
3079        } else {
3080                set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3081                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3082        }
3083
3084        dout("handle_reply tid %lld result %d\n", tid, result);
3085        rinfo = &req->r_reply_info;
3086        if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3087                err = parse_reply_info(session, msg, rinfo, (u64)-1);
3088        else
3089                err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3090        mutex_unlock(&mdsc->mutex);
3091
3092        mutex_lock(&session->s_mutex);
3093        if (err < 0) {
3094                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3095                ceph_msg_dump(msg);
3096                goto out_err;
3097        }
3098
3099        /* snap trace */
3100        realm = NULL;
3101        if (rinfo->snapblob_len) {
3102                down_write(&mdsc->snap_rwsem);
3103                ceph_update_snap_trace(mdsc, rinfo->snapblob,
3104                                rinfo->snapblob + rinfo->snapblob_len,
3105                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3106                                &realm);
3107                downgrade_write(&mdsc->snap_rwsem);
3108        } else {
3109                down_read(&mdsc->snap_rwsem);
3110        }
3111
3112        /* insert trace into our cache */
3113        mutex_lock(&req->r_fill_mutex);
3114        current->journal_info = req;
3115        err = ceph_fill_trace(mdsc->fsc->sb, req);
3116        if (err == 0) {
3117                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3118                                    req->r_op == CEPH_MDS_OP_LSSNAP))
3119                        ceph_readdir_prepopulate(req, req->r_session);
3120        }
3121        current->journal_info = NULL;
3122        mutex_unlock(&req->r_fill_mutex);
3123
3124        up_read(&mdsc->snap_rwsem);
3125        if (realm)
3126                ceph_put_snap_realm(mdsc, realm);
3127
3128        if (err == 0) {
3129                if (req->r_target_inode &&
3130                    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3131                        struct ceph_inode_info *ci =
3132                                ceph_inode(req->r_target_inode);
3133                        spin_lock(&ci->i_unsafe_lock);
3134                        list_add_tail(&req->r_unsafe_target_item,
3135                                      &ci->i_unsafe_iops);
3136                        spin_unlock(&ci->i_unsafe_lock);
3137                }
3138
3139                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3140        }
3141out_err:
3142        mutex_lock(&mdsc->mutex);
3143        if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3144                if (err) {
3145                        req->r_err = err;
3146                } else {
3147                        req->r_reply =  ceph_msg_get(msg);
3148                        set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3149                }
3150        } else {
3151                dout("reply arrived after request %lld was aborted\n", tid);
3152        }
3153        mutex_unlock(&mdsc->mutex);
3154
3155        mutex_unlock(&session->s_mutex);
3156
3157        /* kick calling process */
3158        complete_request(mdsc, req);
3159out:
3160        ceph_mdsc_put_request(req);
3161        return;
3162}
3163
3164
3165
3166/*
3167 * handle mds notification that our request has been forwarded.
3168 */
3169static void handle_forward(struct ceph_mds_client *mdsc,
3170                           struct ceph_mds_session *session,
3171                           struct ceph_msg *msg)
3172{
3173        struct ceph_mds_request *req;
3174        u64 tid = le64_to_cpu(msg->hdr.tid);
3175        u32 next_mds;
3176        u32 fwd_seq;
3177        int err = -EINVAL;
3178        void *p = msg->front.iov_base;
3179        void *end = p + msg->front.iov_len;
3180
3181        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3182        next_mds = ceph_decode_32(&p);
3183        fwd_seq = ceph_decode_32(&p);
3184
3185        mutex_lock(&mdsc->mutex);
3186        req = lookup_get_request(mdsc, tid);
3187        if (!req) {
3188                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3189                goto out;  /* dup reply? */
3190        }
3191
3192        if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3193                dout("forward tid %llu aborted, unregistering\n", tid);
3194                __unregister_request(mdsc, req);
3195        } else if (fwd_seq <= req->r_num_fwd) {
3196                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3197                     tid, next_mds, req->r_num_fwd, fwd_seq);
3198        } else {
3199                /* resend. forward race not possible; mds would drop */
3200                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3201                BUG_ON(req->r_err);
3202                BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3203                req->r_attempts = 0;
3204                req->r_num_fwd = fwd_seq;
3205                req->r_resend_mds = next_mds;
3206                put_request_session(req);
3207                __do_request(mdsc, req);
3208        }
3209        ceph_mdsc_put_request(req);
3210out:
3211        mutex_unlock(&mdsc->mutex);
3212        return;
3213
3214bad:
3215        pr_err("mdsc_handle_forward decode error err=%d\n", err);
3216}
3217
3218static int __decode_session_metadata(void **p, void *end,
3219                                     bool *blacklisted)
3220{
3221        /* map<string,string> */
3222        u32 n;
3223        bool err_str;
3224        ceph_decode_32_safe(p, end, n, bad);
3225        while (n-- > 0) {
3226                u32 len;
3227                ceph_decode_32_safe(p, end, len, bad);
3228                ceph_decode_need(p, end, len, bad);
3229                err_str = !strncmp(*p, "error_string", len);
3230                *p += len;
3231                ceph_decode_32_safe(p, end, len, bad);
3232                ceph_decode_need(p, end, len, bad);
3233                if (err_str && strnstr(*p, "blacklisted", len))
3234                        *blacklisted = true;
3235                *p += len;
3236        }
3237        return 0;
3238bad:
3239        return -1;
3240}
3241
3242/*
3243 * handle a mds session control message
3244 */
3245static void handle_session(struct ceph_mds_session *session,
3246                           struct ceph_msg *msg)
3247{
3248        struct ceph_mds_client *mdsc = session->s_mdsc;
3249        int mds = session->s_mds;
3250        int msg_version = le16_to_cpu(msg->hdr.version);
3251        void *p = msg->front.iov_base;
3252        void *end = p + msg->front.iov_len;
3253        struct ceph_mds_session_head *h;
3254        u32 op;
3255        u64 seq, features = 0;
3256        int wake = 0;
3257        bool blacklisted = false;
3258
3259        /* decode */
3260        ceph_decode_need(&p, end, sizeof(*h), bad);
3261        h = p;
3262        p += sizeof(*h);
3263
3264        op = le32_to_cpu(h->op);
3265        seq = le64_to_cpu(h->seq);
3266
3267        if (msg_version >= 3) {
3268                u32 len;
3269                /* version >= 2, metadata */
3270                if (__decode_session_metadata(&p, end, &blacklisted) < 0)
3271                        goto bad;
3272                /* version >= 3, feature bits */
3273                ceph_decode_32_safe(&p, end, len, bad);
3274                if (len) {
3275                        ceph_decode_64_safe(&p, end, features, bad);
3276                        p += len - sizeof(features);
3277                }
3278        }
3279
3280        mutex_lock(&mdsc->mutex);
3281        if (op == CEPH_SESSION_CLOSE) {
3282                ceph_get_mds_session(session);
3283                __unregister_session(mdsc, session);
3284        }
3285        /* FIXME: this ttl calculation is generous */
3286        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3287        mutex_unlock(&mdsc->mutex);
3288
3289        mutex_lock(&session->s_mutex);
3290
3291        dout("handle_session mds%d %s %p state %s seq %llu\n",
3292             mds, ceph_session_op_name(op), session,
3293             ceph_session_state_name(session->s_state), seq);
3294
3295        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3296                session->s_state = CEPH_MDS_SESSION_OPEN;
3297                pr_info("mds%d came back\n", session->s_mds);
3298        }
3299
3300        switch (op) {
3301        case CEPH_SESSION_OPEN:
3302                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3303                        pr_info("mds%d reconnect success\n", session->s_mds);
3304                session->s_state = CEPH_MDS_SESSION_OPEN;
3305                session->s_features = features;
3306                renewed_caps(mdsc, session, 0);
3307                wake = 1;
3308                if (mdsc->stopping)
3309                        __close_session(mdsc, session);
3310                break;
3311
3312        case CEPH_SESSION_RENEWCAPS:
3313                if (session->s_renew_seq == seq)
3314                        renewed_caps(mdsc, session, 1);
3315                break;
3316
3317        case CEPH_SESSION_CLOSE:
3318                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3319                        pr_info("mds%d reconnect denied\n", session->s_mds);
3320                session->s_state = CEPH_MDS_SESSION_CLOSED;
3321                cleanup_session_requests(mdsc, session);
3322                remove_session_caps(session);
3323                wake = 2; /* for good measure */
3324                wake_up_all(&mdsc->session_close_wq);
3325                break;
3326
3327        case CEPH_SESSION_STALE:
3328                pr_info("mds%d caps went stale, renewing\n",
3329                        session->s_mds);
3330                spin_lock(&session->s_gen_ttl_lock);
3331                session->s_cap_gen++;
3332                session->s_cap_ttl = jiffies - 1;
3333                spin_unlock(&session->s_gen_ttl_lock);
3334                send_renew_caps(mdsc, session);
3335                break;
3336
3337        case CEPH_SESSION_RECALL_STATE:
3338                ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3339                break;
3340
3341        case CEPH_SESSION_FLUSHMSG:
3342                send_flushmsg_ack(mdsc, session, seq);
3343                break;
3344
3345        case CEPH_SESSION_FORCE_RO:
3346                dout("force_session_readonly %p\n", session);
3347                spin_lock(&session->s_cap_lock);
3348                session->s_readonly = true;
3349                spin_unlock(&session->s_cap_lock);
3350                wake_up_session_caps(session, FORCE_RO);
3351                break;
3352
3353        case CEPH_SESSION_REJECT:
3354                WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3355                pr_info("mds%d rejected session\n", session->s_mds);
3356                session->s_state = CEPH_MDS_SESSION_REJECTED;
3357                cleanup_session_requests(mdsc, session);
3358                remove_session_caps(session);
3359                if (blacklisted)
3360                        mdsc->fsc->blacklisted = true;
3361                wake = 2; /* for good measure */
3362                break;
3363
3364        default:
3365                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3366                WARN_ON(1);
3367        }
3368
3369        mutex_unlock(&session->s_mutex);
3370        if (wake) {
3371                mutex_lock(&mdsc->mutex);
3372                __wake_requests(mdsc, &session->s_waiting);
3373                if (wake == 2)
3374                        kick_requests(mdsc, mds);
3375                mutex_unlock(&mdsc->mutex);
3376        }
3377        if (op == CEPH_SESSION_CLOSE)
3378                ceph_put_mds_session(session);
3379        return;
3380
3381bad:
3382        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3383               (int)msg->front.iov_len);
3384        ceph_msg_dump(msg);
3385        return;
3386}
3387
3388void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3389{
3390        int dcaps;
3391
3392        dcaps = xchg(&req->r_dir_caps, 0);
3393        if (dcaps) {
3394                dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3395                ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3396        }
3397}
3398
3399void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3400{
3401        int dcaps;
3402
3403        dcaps = xchg(&req->r_dir_caps, 0);
3404        if (dcaps) {
3405                dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3406                ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3407                                                dcaps);
3408        }
3409}
3410
3411/*
3412 * called under session->mutex.
3413 */
3414static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3415                                   struct ceph_mds_session *session)
3416{
3417        struct ceph_mds_request *req, *nreq;
3418        struct rb_node *p;
3419
3420        dout("replay_unsafe_requests mds%d\n", session->s_mds);
3421
3422        mutex_lock(&mdsc->mutex);
3423        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3424                __send_request(mdsc, session, req, true);
3425
3426        /*
3427         * also re-send old requests when MDS enters reconnect stage. So that MDS
3428         * can process completed request in clientreplay stage.
3429         */
3430        p = rb_first(&mdsc->request_tree);
3431        while (p) {
3432                req = rb_entry(p, struct ceph_mds_request, r_node);
3433                p = rb_next(p);
3434                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3435                        continue;
3436                if (req->r_attempts == 0)
3437                        continue; /* only old requests */
3438                if (!req->r_session)
3439                        continue;
3440                if (req->r_session->s_mds != session->s_mds)
3441                        continue;
3442
3443                ceph_mdsc_release_dir_caps_no_check(req);
3444
3445                __send_request(mdsc, session, req, true);
3446        }
3447        mutex_unlock(&mdsc->mutex);
3448}
3449
3450static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3451{
3452        struct ceph_msg *reply;
3453        struct ceph_pagelist *_pagelist;
3454        struct page *page;
3455        __le32 *addr;
3456        int err = -ENOMEM;
3457
3458        if (!recon_state->allow_multi)
3459                return -ENOSPC;
3460
3461        /* can't handle message that contains both caps and realm */
3462        BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3463
3464        /* pre-allocate new pagelist */
3465        _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3466        if (!_pagelist)
3467                return -ENOMEM;
3468
3469        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3470        if (!reply)
3471                goto fail_msg;
3472
3473        /* placeholder for nr_caps */
3474        err = ceph_pagelist_encode_32(_pagelist, 0);
3475        if (err < 0)
3476                goto fail;
3477
3478        if (recon_state->nr_caps) {
3479                /* currently encoding caps */
3480                err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3481                if (err)
3482                        goto fail;
3483        } else {
3484                /* placeholder for nr_realms (currently encoding relams) */
3485                err = ceph_pagelist_encode_32(_pagelist, 0);
3486                if (err < 0)
3487                        goto fail;
3488        }
3489
3490        err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3491        if (err)
3492                goto fail;
3493
3494        page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3495        addr = kmap_atomic(page);
3496        if (recon_state->nr_caps) {
3497                /* currently encoding caps */
3498                *addr = cpu_to_le32(recon_state->nr_caps);
3499        } else {
3500                /* currently encoding relams */
3501                *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3502        }
3503        kunmap_atomic(addr);
3504
3505        reply->hdr.version = cpu_to_le16(5);
3506        reply->hdr.compat_version = cpu_to_le16(4);
3507
3508        reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3509        ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3510
3511        ceph_con_send(&recon_state->session->s_con, reply);
3512        ceph_pagelist_release(recon_state->pagelist);
3513
3514        recon_state->pagelist = _pagelist;
3515        recon_state->nr_caps = 0;
3516        recon_state->nr_realms = 0;
3517        recon_state->msg_version = 5;
3518        return 0;
3519fail:
3520        ceph_msg_put(reply);
3521fail_msg:
3522        ceph_pagelist_release(_pagelist);
3523        return err;
3524}
3525
3526/*
3527 * Encode information about a cap for a reconnect with the MDS.
3528 */
3529static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3530                          void *arg)
3531{
3532        union {
3533                struct ceph_mds_cap_reconnect v2;
3534                struct ceph_mds_cap_reconnect_v1 v1;
3535        } rec;
3536        struct ceph_inode_info *ci = cap->ci;
3537        struct ceph_reconnect_state *recon_state = arg;
3538        struct ceph_pagelist *pagelist = recon_state->pagelist;
3539        int err;
3540        u64 snap_follows;
3541
3542        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3543             inode, ceph_vinop(inode), cap, cap->cap_id,
3544             ceph_cap_string(cap->issued));
3545
3546        spin_lock(&ci->i_ceph_lock);
3547        cap->seq = 0;        /* reset cap seq */
3548        cap->issue_seq = 0;  /* and issue_seq */
3549        cap->mseq = 0;       /* and migrate_seq */
3550        cap->cap_gen = cap->session->s_cap_gen;
3551
3552        /* These are lost when the session goes away */
3553        if (S_ISDIR(inode->i_mode)) {
3554                if (cap->issued & CEPH_CAP_DIR_CREATE) {
3555                        ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3556                        memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3557                }
3558                cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3559        }
3560
3561        if (recon_state->msg_version >= 2) {
3562                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3563                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3564                rec.v2.issued = cpu_to_le32(cap->issued);
3565                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3566                rec.v2.pathbase = 0;
3567                rec.v2.flock_len = (__force __le32)
3568                        ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3569        } else {
3570                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3571                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3572                rec.v1.issued = cpu_to_le32(cap->issued);
3573                rec.v1.size = cpu_to_le64(inode->i_size);
3574                ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3575                ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3576                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3577                rec.v1.pathbase = 0;
3578        }
3579
3580        if (list_empty(&ci->i_cap_snaps)) {
3581                snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3582        } else {
3583                struct ceph_cap_snap *capsnap =
3584                        list_first_entry(&ci->i_cap_snaps,
3585                                         struct ceph_cap_snap, ci_item);
3586                snap_follows = capsnap->follows;
3587        }
3588        spin_unlock(&ci->i_ceph_lock);
3589
3590        if (recon_state->msg_version >= 2) {
3591                int num_fcntl_locks, num_flock_locks;
3592                struct ceph_filelock *flocks = NULL;
3593                size_t struct_len, total_len = sizeof(u64);
3594                u8 struct_v = 0;
3595
3596encode_again:
3597                if (rec.v2.flock_len) {
3598                        ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3599                } else {
3600                        num_fcntl_locks = 0;
3601                        num_flock_locks = 0;
3602                }
3603                if (num_fcntl_locks + num_flock_locks > 0) {
3604                        flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3605                                               sizeof(struct ceph_filelock),
3606                                               GFP_NOFS);
3607                        if (!flocks) {
3608                                err = -ENOMEM;
3609                                goto out_err;
3610                        }
3611                        err = ceph_encode_locks_to_buffer(inode, flocks,
3612                                                          num_fcntl_locks,
3613                                                          num_flock_locks);
3614                        if (err) {
3615                                kfree(flocks);
3616                                flocks = NULL;
3617                                if (err == -ENOSPC)
3618                                        goto encode_again;
3619                                goto out_err;
3620                        }
3621                } else {
3622                        kfree(flocks);
3623                        flocks = NULL;
3624                }
3625
3626                if (recon_state->msg_version >= 3) {
3627                        /* version, compat_version and struct_len */
3628                        total_len += 2 * sizeof(u8) + sizeof(u32);
3629                        struct_v = 2;
3630                }
3631                /*
3632                 * number of encoded locks is stable, so copy to pagelist
3633                 */
3634                struct_len = 2 * sizeof(u32) +
3635                            (num_fcntl_locks + num_flock_locks) *
3636                            sizeof(struct ceph_filelock);
3637                rec.v2.flock_len = cpu_to_le32(struct_len);
3638
3639                struct_len += sizeof(u32) + sizeof(rec.v2);
3640
3641                if (struct_v >= 2)
3642                        struct_len += sizeof(u64); /* snap_follows */
3643
3644                total_len += struct_len;
3645
3646                if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3647                        err = send_reconnect_partial(recon_state);
3648                        if (err)
3649                                goto out_freeflocks;
3650                        pagelist = recon_state->pagelist;
3651                }
3652
3653                err = ceph_pagelist_reserve(pagelist, total_len);
3654                if (err)
3655                        goto out_freeflocks;
3656
3657                ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3658                if (recon_state->msg_version >= 3) {
3659                        ceph_pagelist_encode_8(pagelist, struct_v);
3660                        ceph_pagelist_encode_8(pagelist, 1);
3661                        ceph_pagelist_encode_32(pagelist, struct_len);
3662                }
3663                ceph_pagelist_encode_string(pagelist, NULL, 0);
3664                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3665                ceph_locks_to_pagelist(flocks, pagelist,
3666                                       num_fcntl_locks, num_flock_locks);
3667                if (struct_v >= 2)
3668                        ceph_pagelist_encode_64(pagelist, snap_follows);
3669out_freeflocks:
3670                kfree(flocks);
3671        } else {
3672                u64 pathbase = 0;
3673                int pathlen = 0;
3674                char *path = NULL;
3675                struct dentry *dentry;
3676
3677                dentry = d_find_alias(inode);
3678                if (dentry) {
3679                        path = ceph_mdsc_build_path(dentry,
3680                                                &pathlen, &pathbase, 0);
3681                        dput(dentry);
3682                        if (IS_ERR(path)) {
3683                                err = PTR_ERR(path);
3684                                goto out_err;
3685                        }
3686                        rec.v1.pathbase = cpu_to_le64(pathbase);
3687                }
3688
3689                err = ceph_pagelist_reserve(pagelist,
3690                                            sizeof(u64) + sizeof(u32) +
3691                                            pathlen + sizeof(rec.v1));
3692                if (err) {
3693                        goto out_freepath;
3694                }
3695
3696                ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3697                ceph_pagelist_encode_string(pagelist, path, pathlen);
3698                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3699out_freepath:
3700                ceph_mdsc_free_path(path, pathlen);
3701        }
3702
3703out_err:
3704        if (err >= 0)
3705                recon_state->nr_caps++;
3706        return err;
3707}
3708
3709static int encode_snap_realms(struct ceph_mds_client *mdsc,
3710                              struct ceph_reconnect_state *recon_state)
3711{
3712        struct rb_node *p;
3713        struct ceph_pagelist *pagelist = recon_state->pagelist;
3714        int err = 0;
3715
3716        if (recon_state->msg_version >= 4) {
3717                err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3718                if (err < 0)
3719                        goto fail;
3720        }
3721
3722        /*
3723         * snaprealms.  we provide mds with the ino, seq (version), and
3724         * parent for all of our realms.  If the mds has any newer info,
3725         * it will tell us.
3726         */
3727        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3728                struct ceph_snap_realm *realm =
3729                       rb_entry(p, struct ceph_snap_realm, node);
3730                struct ceph_mds_snaprealm_reconnect sr_rec;
3731
3732                if (recon_state->msg_version >= 4) {
3733                        size_t need = sizeof(u8) * 2 + sizeof(u32) +
3734                                      sizeof(sr_rec);
3735
3736                        if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3737                                err = send_reconnect_partial(recon_state);
3738                                if (err)
3739                                        goto fail;
3740                                pagelist = recon_state->pagelist;
3741                        }
3742
3743                        err = ceph_pagelist_reserve(pagelist, need);
3744                        if (err)
3745                                goto fail;
3746
3747                        ceph_pagelist_encode_8(pagelist, 1);
3748                        ceph_pagelist_encode_8(pagelist, 1);
3749                        ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3750                }
3751
3752                dout(" adding snap realm %llx seq %lld parent %llx\n",
3753                     realm->ino, realm->seq, realm->parent_ino);
3754                sr_rec.ino = cpu_to_le64(realm->ino);
3755                sr_rec.seq = cpu_to_le64(realm->seq);
3756                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3757
3758                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3759                if (err)
3760                        goto fail;
3761
3762                recon_state->nr_realms++;
3763        }
3764fail:
3765        return err;
3766}
3767
3768
3769/*
3770 * If an MDS fails and recovers, clients need to reconnect in order to
3771 * reestablish shared state.  This includes all caps issued through
3772 * this session _and_ the snap_realm hierarchy.  Because it's not
3773 * clear which snap realms the mds cares about, we send everything we
3774 * know about.. that ensures we'll then get any new info the
3775 * recovering MDS might have.
3776 *
3777 * This is a relatively heavyweight operation, but it's rare.
3778 */
3779static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3780                               struct ceph_mds_session *session)
3781{
3782        struct ceph_msg *reply;
3783        int mds = session->s_mds;
3784        int err = -ENOMEM;
3785        struct ceph_reconnect_state recon_state = {
3786                .session = session,
3787        };
3788        LIST_HEAD(dispose);
3789
3790        pr_info("mds%d reconnect start\n", mds);
3791
3792        recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3793        if (!recon_state.pagelist)
3794                goto fail_nopagelist;
3795
3796        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3797        if (!reply)
3798                goto fail_nomsg;
3799
3800        xa_destroy(&session->s_delegated_inos);
3801
3802        mutex_lock(&session->s_mutex);
3803        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3804        session->s_seq = 0;
3805
3806        dout("session %p state %s\n", session,
3807             ceph_session_state_name(session->s_state));
3808
3809        spin_lock(&session->s_gen_ttl_lock);
3810        session->s_cap_gen++;
3811        spin_unlock(&session->s_gen_ttl_lock);
3812
3813        spin_lock(&session->s_cap_lock);
3814        /* don't know if session is readonly */
3815        session->s_readonly = 0;
3816        /*
3817         * notify __ceph_remove_cap() that we are composing cap reconnect.
3818         * If a cap get released before being added to the cap reconnect,
3819         * __ceph_remove_cap() should skip queuing cap release.
3820         */
3821        session->s_cap_reconnect = 1;
3822        /* drop old cap expires; we're about to reestablish that state */
3823        detach_cap_releases(session, &dispose);
3824        spin_unlock(&session->s_cap_lock);
3825        dispose_cap_releases(mdsc, &dispose);
3826
3827        /* trim unused caps to reduce MDS's cache rejoin time */
3828        if (mdsc->fsc->sb->s_root)
3829                shrink_dcache_parent(mdsc->fsc->sb->s_root);
3830
3831        ceph_con_close(&session->s_con);
3832        ceph_con_open(&session->s_con,
3833                      CEPH_ENTITY_TYPE_MDS, mds,
3834                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3835
3836        /* replay unsafe requests */
3837        replay_unsafe_requests(mdsc, session);
3838
3839        ceph_early_kick_flushing_caps(mdsc, session);
3840
3841        down_read(&mdsc->snap_rwsem);
3842
3843        /* placeholder for nr_caps */
3844        err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3845        if (err)
3846                goto fail;
3847
3848        if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3849                recon_state.msg_version = 3;
3850                recon_state.allow_multi = true;
3851        } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3852                recon_state.msg_version = 3;
3853        } else {
3854                recon_state.msg_version = 2;
3855        }
3856        /* trsaverse this session's caps */
3857        err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
3858
3859        spin_lock(&session->s_cap_lock);
3860        session->s_cap_reconnect = 0;
3861        spin_unlock(&session->s_cap_lock);
3862
3863        if (err < 0)
3864                goto fail;
3865
3866        /* check if all realms can be encoded into current message */
3867        if (mdsc->num_snap_realms) {
3868                size_t total_len =
3869                        recon_state.pagelist->length +
3870                        mdsc->num_snap_realms *
3871                        sizeof(struct ceph_mds_snaprealm_reconnect);
3872                if (recon_state.msg_version >= 4) {
3873                        /* number of realms */
3874                        total_len += sizeof(u32);
3875                        /* version, compat_version and struct_len */
3876                        total_len += mdsc->num_snap_realms *
3877                                     (2 * sizeof(u8) + sizeof(u32));
3878                }
3879                if (total_len > RECONNECT_MAX_SIZE) {
3880                        if (!recon_state.allow_multi) {
3881                                err = -ENOSPC;
3882                                goto fail;
3883                        }
3884                        if (recon_state.nr_caps) {
3885                                err = send_reconnect_partial(&recon_state);
3886                                if (err)
3887                                        goto fail;
3888                        }
3889                        recon_state.msg_version = 5;
3890                }
3891        }
3892
3893        err = encode_snap_realms(mdsc, &recon_state);
3894        if (err < 0)
3895                goto fail;
3896
3897        if (recon_state.msg_version >= 5) {
3898                err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3899                if (err < 0)
3900                        goto fail;
3901        }
3902
3903        if (recon_state.nr_caps || recon_state.nr_realms) {
3904                struct page *page =
3905                        list_first_entry(&recon_state.pagelist->head,
3906                                        struct page, lru);
3907                __le32 *addr = kmap_atomic(page);
3908                if (recon_state.nr_caps) {
3909                        WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3910                        *addr = cpu_to_le32(recon_state.nr_caps);
3911                } else if (recon_state.msg_version >= 4) {
3912                        *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3913                }
3914                kunmap_atomic(addr);
3915        }
3916
3917        reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3918        if (recon_state.msg_version >= 4)
3919                reply->hdr.compat_version = cpu_to_le16(4);
3920
3921        reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3922        ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3923
3924        ceph_con_send(&session->s_con, reply);
3925
3926        mutex_unlock(&session->s_mutex);
3927
3928        mutex_lock(&mdsc->mutex);
3929        __wake_requests(mdsc, &session->s_waiting);
3930        mutex_unlock(&mdsc->mutex);
3931
3932        up_read(&mdsc->snap_rwsem);
3933        ceph_pagelist_release(recon_state.pagelist);
3934        return;
3935
3936fail:
3937        ceph_msg_put(reply);
3938        up_read(&mdsc->snap_rwsem);
3939        mutex_unlock(&session->s_mutex);
3940fail_nomsg:
3941        ceph_pagelist_release(recon_state.pagelist);
3942fail_nopagelist:
3943        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3944        return;
3945}
3946
3947
3948/*
3949 * compare old and new mdsmaps, kicking requests
3950 * and closing out old connections as necessary
3951 *
3952 * called under mdsc->mutex.
3953 */
3954static void check_new_map(struct ceph_mds_client *mdsc,
3955                          struct ceph_mdsmap *newmap,
3956                          struct ceph_mdsmap *oldmap)
3957{
3958        int i;
3959        int oldstate, newstate;
3960        struct ceph_mds_session *s;
3961
3962        dout("check_new_map new %u old %u\n",
3963             newmap->m_epoch, oldmap->m_epoch);
3964
3965        for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
3966                if (!mdsc->sessions[i])
3967                        continue;
3968                s = mdsc->sessions[i];
3969                oldstate = ceph_mdsmap_get_state(oldmap, i);
3970                newstate = ceph_mdsmap_get_state(newmap, i);
3971
3972                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3973                     i, ceph_mds_state_name(oldstate),
3974                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3975                     ceph_mds_state_name(newstate),
3976                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3977                     ceph_session_state_name(s->s_state));
3978
3979                if (i >= newmap->possible_max_rank) {
3980                        /* force close session for stopped mds */
3981                        ceph_get_mds_session(s);
3982                        __unregister_session(mdsc, s);
3983                        __wake_requests(mdsc, &s->s_waiting);
3984                        mutex_unlock(&mdsc->mutex);
3985
3986                        mutex_lock(&s->s_mutex);
3987                        cleanup_session_requests(mdsc, s);
3988                        remove_session_caps(s);
3989                        mutex_unlock(&s->s_mutex);
3990
3991                        ceph_put_mds_session(s);
3992
3993                        mutex_lock(&mdsc->mutex);
3994                        kick_requests(mdsc, i);
3995                        continue;
3996                }
3997
3998                if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3999                           ceph_mdsmap_get_addr(newmap, i),
4000                           sizeof(struct ceph_entity_addr))) {
4001                        /* just close it */
4002                        mutex_unlock(&mdsc->mutex);
4003                        mutex_lock(&s->s_mutex);
4004                        mutex_lock(&mdsc->mutex);
4005                        ceph_con_close(&s->s_con);
4006                        mutex_unlock(&s->s_mutex);
4007                        s->s_state = CEPH_MDS_SESSION_RESTARTING;
4008                } else if (oldstate == newstate) {
4009                        continue;  /* nothing new with this mds */
4010                }
4011
4012                /*
4013                 * send reconnect?
4014                 */
4015                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4016                    newstate >= CEPH_MDS_STATE_RECONNECT) {
4017                        mutex_unlock(&mdsc->mutex);
4018                        send_mds_reconnect(mdsc, s);
4019                        mutex_lock(&mdsc->mutex);
4020                }
4021
4022                /*
4023                 * kick request on any mds that has gone active.
4024                 */
4025                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4026                    newstate >= CEPH_MDS_STATE_ACTIVE) {
4027                        if (oldstate != CEPH_MDS_STATE_CREATING &&
4028                            oldstate != CEPH_MDS_STATE_STARTING)
4029                                pr_info("mds%d recovery completed\n", s->s_mds);
4030                        kick_requests(mdsc, i);
4031                        mutex_unlock(&mdsc->mutex);
4032                        mutex_lock(&s->s_mutex);
4033                        mutex_lock(&mdsc->mutex);
4034                        ceph_kick_flushing_caps(mdsc, s);
4035                        mutex_unlock(&s->s_mutex);
4036                        wake_up_session_caps(s, RECONNECT);
4037                }
4038        }
4039
4040        for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4041                s = mdsc->sessions[i];
4042                if (!s)
4043                        continue;
4044                if (!ceph_mdsmap_is_laggy(newmap, i))
4045                        continue;
4046                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4047                    s->s_state == CEPH_MDS_SESSION_HUNG ||
4048                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4049                        dout(" connecting to export targets of laggy mds%d\n",
4050                             i);
4051                        __open_export_target_sessions(mdsc, s);
4052                }
4053        }
4054}
4055
4056
4057
4058/*
4059 * leases
4060 */
4061
4062/*
4063 * caller must hold session s_mutex, dentry->d_lock
4064 */
4065void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4066{
4067        struct ceph_dentry_info *di = ceph_dentry(dentry);
4068
4069        ceph_put_mds_session(di->lease_session);
4070        di->lease_session = NULL;
4071}
4072
4073static void handle_lease(struct ceph_mds_client *mdsc,
4074                         struct ceph_mds_session *session,
4075                         struct ceph_msg *msg)
4076{
4077        struct super_block *sb = mdsc->fsc->sb;
4078        struct inode *inode;
4079        struct dentry *parent, *dentry;
4080        struct ceph_dentry_info *di;
4081        int mds = session->s_mds;
4082        struct ceph_mds_lease *h = msg->front.iov_base;
4083        u32 seq;
4084        struct ceph_vino vino;
4085        struct qstr dname;
4086        int release = 0;
4087
4088        dout("handle_lease from mds%d\n", mds);
4089
4090        /* decode */
4091        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4092                goto bad;
4093        vino.ino = le64_to_cpu(h->ino);
4094        vino.snap = CEPH_NOSNAP;
4095        seq = le32_to_cpu(h->seq);
4096        dname.len = get_unaligned_le32(h + 1);
4097        if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4098                goto bad;
4099        dname.name = (void *)(h + 1) + sizeof(u32);
4100
4101        /* lookup inode */
4102        inode = ceph_find_inode(sb, vino);
4103        dout("handle_lease %s, ino %llx %p %.*s\n",
4104             ceph_lease_op_name(h->action), vino.ino, inode,
4105             dname.len, dname.name);
4106
4107        mutex_lock(&session->s_mutex);
4108        session->s_seq++;
4109
4110        if (!inode) {
4111                dout("handle_lease no inode %llx\n", vino.ino);
4112                goto release;
4113        }
4114
4115        /* dentry */
4116        parent = d_find_alias(inode);
4117        if (!parent) {
4118                dout("no parent dentry on inode %p\n", inode);
4119                WARN_ON(1);
4120                goto release;  /* hrm... */
4121        }
4122        dname.hash = full_name_hash(parent, dname.name, dname.len);
4123        dentry = d_lookup(parent, &dname);
4124        dput(parent);
4125        if (!dentry)
4126                goto release;
4127
4128        spin_lock(&dentry->d_lock);
4129        di = ceph_dentry(dentry);
4130        switch (h->action) {
4131        case CEPH_MDS_LEASE_REVOKE:
4132                if (di->lease_session == session) {
4133                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4134                                h->seq = cpu_to_le32(di->lease_seq);
4135                        __ceph_mdsc_drop_dentry_lease(dentry);
4136                }
4137                release = 1;
4138                break;
4139
4140        case CEPH_MDS_LEASE_RENEW:
4141                if (di->lease_session == session &&
4142                    di->lease_gen == session->s_cap_gen &&
4143                    di->lease_renew_from &&
4144                    di->lease_renew_after == 0) {
4145                        unsigned long duration =
4146                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4147
4148                        di->lease_seq = seq;
4149                        di->time = di->lease_renew_from + duration;
4150                        di->lease_renew_after = di->lease_renew_from +
4151                                (duration >> 1);
4152                        di->lease_renew_from = 0;
4153                }
4154                break;
4155        }
4156        spin_unlock(&dentry->d_lock);
4157        dput(dentry);
4158
4159        if (!release)
4160                goto out;
4161
4162release:
4163        /* let's just reuse the same message */
4164        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4165        ceph_msg_get(msg);
4166        ceph_con_send(&session->s_con, msg);
4167
4168out:
4169        mutex_unlock(&session->s_mutex);
4170        /* avoid calling iput_final() in mds dispatch threads */
4171        ceph_async_iput(inode);
4172        return;
4173
4174bad:
4175        pr_err("corrupt lease message\n");
4176        ceph_msg_dump(msg);
4177}
4178
4179void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4180                              struct dentry *dentry, char action,
4181                              u32 seq)
4182{
4183        struct ceph_msg *msg;
4184        struct ceph_mds_lease *lease;
4185        struct inode *dir;
4186        int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4187
4188        dout("lease_send_msg identry %p %s to mds%d\n",
4189             dentry, ceph_lease_op_name(action), session->s_mds);
4190
4191        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4192        if (!msg)
4193                return;
4194        lease = msg->front.iov_base;
4195        lease->action = action;
4196        lease->seq = cpu_to_le32(seq);
4197
4198        spin_lock(&dentry->d_lock);
4199        dir = d_inode(dentry->d_parent);
4200        lease->ino = cpu_to_le64(ceph_ino(dir));
4201        lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4202
4203        put_unaligned_le32(dentry->d_name.len, lease + 1);
4204        memcpy((void *)(lease + 1) + 4,
4205               dentry->d_name.name, dentry->d_name.len);
4206        spin_unlock(&dentry->d_lock);
4207        /*
4208         * if this is a preemptive lease RELEASE, no need to
4209         * flush request stream, since the actual request will
4210         * soon follow.
4211         */
4212        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4213
4214        ceph_con_send(&session->s_con, msg);
4215}
4216
4217/*
4218 * lock unlock sessions, to wait ongoing session activities
4219 */
4220static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4221{
4222        int i;
4223
4224        mutex_lock(&mdsc->mutex);
4225        for (i = 0; i < mdsc->max_sessions; i++) {
4226                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4227                if (!s)
4228                        continue;
4229                mutex_unlock(&mdsc->mutex);
4230                mutex_lock(&s->s_mutex);
4231                mutex_unlock(&s->s_mutex);
4232                ceph_put_mds_session(s);
4233                mutex_lock(&mdsc->mutex);
4234        }
4235        mutex_unlock(&mdsc->mutex);
4236}
4237
4238static void maybe_recover_session(struct ceph_mds_client *mdsc)
4239{
4240        struct ceph_fs_client *fsc = mdsc->fsc;
4241
4242        if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4243                return;
4244
4245        if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4246                return;
4247
4248        if (!READ_ONCE(fsc->blacklisted))
4249                return;
4250
4251        if (fsc->last_auto_reconnect &&
4252            time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4253                return;
4254
4255        pr_info("auto reconnect after blacklisted\n");
4256        fsc->last_auto_reconnect = jiffies;
4257        ceph_force_reconnect(fsc->sb);
4258}
4259
4260/*
4261 * delayed work -- periodically trim expired leases, renew caps with mds
4262 */
4263static void schedule_delayed(struct ceph_mds_client *mdsc)
4264{
4265        int delay = 5;
4266        unsigned hz = round_jiffies_relative(HZ * delay);
4267        schedule_delayed_work(&mdsc->delayed_work, hz);
4268}
4269
4270static void delayed_work(struct work_struct *work)
4271{
4272        int i;
4273        struct ceph_mds_client *mdsc =
4274                container_of(work, struct ceph_mds_client, delayed_work.work);
4275        int renew_interval;
4276        int renew_caps;
4277
4278        dout("mdsc delayed_work\n");
4279
4280        mutex_lock(&mdsc->mutex);
4281        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4282        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4283                                   mdsc->last_renew_caps);
4284        if (renew_caps)
4285                mdsc->last_renew_caps = jiffies;
4286
4287        for (i = 0; i < mdsc->max_sessions; i++) {
4288                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4289                if (!s)
4290                        continue;
4291                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4292                        dout("resending session close request for mds%d\n",
4293                             s->s_mds);
4294                        request_close_session(mdsc, s);
4295                        ceph_put_mds_session(s);
4296                        continue;
4297                }
4298                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4299                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4300                                s->s_state = CEPH_MDS_SESSION_HUNG;
4301                                pr_info("mds%d hung\n", s->s_mds);
4302                        }
4303                }
4304                if (s->s_state == CEPH_MDS_SESSION_NEW ||
4305                    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
4306                    s->s_state == CEPH_MDS_SESSION_REJECTED) {
4307                        /* this mds is failed or recovering, just wait */
4308                        ceph_put_mds_session(s);
4309                        continue;
4310                }
4311                mutex_unlock(&mdsc->mutex);
4312
4313                mutex_lock(&s->s_mutex);
4314                if (renew_caps)
4315                        send_renew_caps(mdsc, s);
4316                else
4317                        ceph_con_keepalive(&s->s_con);
4318                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4319                    s->s_state == CEPH_MDS_SESSION_HUNG)
4320                        ceph_send_cap_releases(mdsc, s);
4321                mutex_unlock(&s->s_mutex);
4322                ceph_put_mds_session(s);
4323
4324                mutex_lock(&mdsc->mutex);
4325        }
4326        mutex_unlock(&mdsc->mutex);
4327
4328        ceph_check_delayed_caps(mdsc);
4329
4330        ceph_queue_cap_reclaim_work(mdsc);
4331
4332        ceph_trim_snapid_map(mdsc);
4333
4334        maybe_recover_session(mdsc);
4335
4336        schedule_delayed(mdsc);
4337}
4338
4339int ceph_mdsc_init(struct ceph_fs_client *fsc)
4340
4341{
4342        struct ceph_mds_client *mdsc;
4343
4344        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4345        if (!mdsc)
4346                return -ENOMEM;
4347        mdsc->fsc = fsc;
4348        mutex_init(&mdsc->mutex);
4349        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4350        if (!mdsc->mdsmap) {
4351                kfree(mdsc);
4352                return -ENOMEM;
4353        }
4354
4355        fsc->mdsc = mdsc;
4356        init_completion(&mdsc->safe_umount_waiters);
4357        init_waitqueue_head(&mdsc->session_close_wq);
4358        INIT_LIST_HEAD(&mdsc->waiting_for_map);
4359        mdsc->sessions = NULL;
4360        atomic_set(&mdsc->num_sessions, 0);
4361        mdsc->max_sessions = 0;
4362        mdsc->stopping = 0;
4363        atomic64_set(&mdsc->quotarealms_count, 0);
4364        mdsc->quotarealms_inodes = RB_ROOT;
4365        mutex_init(&mdsc->quotarealms_inodes_mutex);
4366        mdsc->last_snap_seq = 0;
4367        init_rwsem(&mdsc->snap_rwsem);
4368        mdsc->snap_realms = RB_ROOT;
4369        INIT_LIST_HEAD(&mdsc->snap_empty);
4370        mdsc->num_snap_realms = 0;
4371        spin_lock_init(&mdsc->snap_empty_lock);
4372        mdsc->last_tid = 0;
4373        mdsc->oldest_tid = 0;
4374        mdsc->request_tree = RB_ROOT;
4375        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4376        mdsc->last_renew_caps = jiffies;
4377        INIT_LIST_HEAD(&mdsc->cap_delay_list);
4378        INIT_LIST_HEAD(&mdsc->cap_wait_list);
4379        spin_lock_init(&mdsc->cap_delay_lock);
4380        INIT_LIST_HEAD(&mdsc->snap_flush_list);
4381        spin_lock_init(&mdsc->snap_flush_lock);
4382        mdsc->last_cap_flush_tid = 1;
4383        INIT_LIST_HEAD(&mdsc->cap_flush_list);
4384        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4385        mdsc->num_cap_flushing = 0;
4386        spin_lock_init(&mdsc->cap_dirty_lock);
4387        init_waitqueue_head(&mdsc->cap_flushing_wq);
4388        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4389        atomic_set(&mdsc->cap_reclaim_pending, 0);
4390
4391        spin_lock_init(&mdsc->dentry_list_lock);
4392        INIT_LIST_HEAD(&mdsc->dentry_leases);
4393        INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4394
4395        ceph_caps_init(mdsc);
4396        ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4397
4398        spin_lock_init(&mdsc->snapid_map_lock);
4399        mdsc->snapid_map_tree = RB_ROOT;
4400        INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4401
4402        init_rwsem(&mdsc->pool_perm_rwsem);
4403        mdsc->pool_perm_tree = RB_ROOT;
4404
4405        strscpy(mdsc->nodename, utsname()->nodename,
4406                sizeof(mdsc->nodename));
4407        return 0;
4408}
4409
4410/*
4411 * Wait for safe replies on open mds requests.  If we time out, drop
4412 * all requests from the tree to avoid dangling dentry refs.
4413 */
4414static void wait_requests(struct ceph_mds_client *mdsc)
4415{
4416        struct ceph_options *opts = mdsc->fsc->client->options;
4417        struct ceph_mds_request *req;
4418
4419        mutex_lock(&mdsc->mutex);
4420        if (__get_oldest_req(mdsc)) {
4421                mutex_unlock(&mdsc->mutex);
4422
4423                dout("wait_requests waiting for requests\n");
4424                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4425                                    ceph_timeout_jiffies(opts->mount_timeout));
4426
4427                /* tear down remaining requests */
4428                mutex_lock(&mdsc->mutex);
4429                while ((req = __get_oldest_req(mdsc))) {
4430                        dout("wait_requests timed out on tid %llu\n",
4431                             req->r_tid);
4432                        list_del_init(&req->r_wait);
4433                        __unregister_request(mdsc, req);
4434                }
4435        }
4436        mutex_unlock(&mdsc->mutex);
4437        dout("wait_requests done\n");
4438}
4439
4440/*
4441 * called before mount is ro, and before dentries are torn down.
4442 * (hmm, does this still race with new lookups?)
4443 */
4444void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4445{
4446        dout("pre_umount\n");
4447        mdsc->stopping = 1;
4448
4449        lock_unlock_sessions(mdsc);
4450        ceph_flush_dirty_caps(mdsc);
4451        wait_requests(mdsc);
4452
4453        /*
4454         * wait for reply handlers to drop their request refs and
4455         * their inode/dcache refs
4456         */
4457        ceph_msgr_flush();
4458
4459        ceph_cleanup_quotarealms_inodes(mdsc);
4460}
4461
4462/*
4463 * wait for all write mds requests to flush.
4464 */
4465static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4466{
4467        struct ceph_mds_request *req = NULL, *nextreq;
4468        struct rb_node *n;
4469
4470        mutex_lock(&mdsc->mutex);
4471        dout("wait_unsafe_requests want %lld\n", want_tid);
4472restart:
4473        req = __get_oldest_req(mdsc);
4474        while (req && req->r_tid <= want_tid) {
4475                /* find next request */
4476                n = rb_next(&req->r_node);
4477                if (n)
4478                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4479                else
4480                        nextreq = NULL;
4481                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4482                    (req->r_op & CEPH_MDS_OP_WRITE)) {
4483                        /* write op */
4484                        ceph_mdsc_get_request(req);
4485                        if (nextreq)
4486                                ceph_mdsc_get_request(nextreq);
4487                        mutex_unlock(&mdsc->mutex);
4488                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4489                             req->r_tid, want_tid);
4490                        wait_for_completion(&req->r_safe_completion);
4491                        mutex_lock(&mdsc->mutex);
4492                        ceph_mdsc_put_request(req);
4493                        if (!nextreq)
4494                                break;  /* next dne before, so we're done! */
4495                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
4496                                /* next request was removed from tree */
4497                                ceph_mdsc_put_request(nextreq);
4498                                goto restart;
4499                        }
4500                        ceph_mdsc_put_request(nextreq);  /* won't go away */
4501                }
4502                req = nextreq;
4503        }
4504        mutex_unlock(&mdsc->mutex);
4505        dout("wait_unsafe_requests done\n");
4506}
4507
4508void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4509{
4510        u64 want_tid, want_flush;
4511
4512        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4513                return;
4514
4515        dout("sync\n");
4516        mutex_lock(&mdsc->mutex);
4517        want_tid = mdsc->last_tid;
4518        mutex_unlock(&mdsc->mutex);
4519
4520        ceph_flush_dirty_caps(mdsc);
4521        spin_lock(&mdsc->cap_dirty_lock);
4522        want_flush = mdsc->last_cap_flush_tid;
4523        if (!list_empty(&mdsc->cap_flush_list)) {
4524                struct ceph_cap_flush *cf =
4525                        list_last_entry(&mdsc->cap_flush_list,
4526                                        struct ceph_cap_flush, g_list);
4527                cf->wake = true;
4528        }
4529        spin_unlock(&mdsc->cap_dirty_lock);
4530
4531        dout("sync want tid %lld flush_seq %lld\n",
4532             want_tid, want_flush);
4533
4534        wait_unsafe_requests(mdsc, want_tid);
4535        wait_caps_flush(mdsc, want_flush);
4536}
4537
4538/*
4539 * true if all sessions are closed, or we force unmount
4540 */
4541static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4542{
4543        if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4544                return true;
4545        return atomic_read(&mdsc->num_sessions) <= skipped;
4546}
4547
4548/*
4549 * called after sb is ro.
4550 */
4551void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4552{
4553        struct ceph_options *opts = mdsc->fsc->client->options;
4554        struct ceph_mds_session *session;
4555        int i;
4556        int skipped = 0;
4557
4558        dout("close_sessions\n");
4559
4560        /* close sessions */
4561        mutex_lock(&mdsc->mutex);
4562        for (i = 0; i < mdsc->max_sessions; i++) {
4563                session = __ceph_lookup_mds_session(mdsc, i);
4564                if (!session)
4565                        continue;
4566                mutex_unlock(&mdsc->mutex);
4567                mutex_lock(&session->s_mutex);
4568                if (__close_session(mdsc, session) <= 0)
4569                        skipped++;
4570                mutex_unlock(&session->s_mutex);
4571                ceph_put_mds_session(session);
4572                mutex_lock(&mdsc->mutex);
4573        }
4574        mutex_unlock(&mdsc->mutex);
4575
4576        dout("waiting for sessions to close\n");
4577        wait_event_timeout(mdsc->session_close_wq,
4578                           done_closing_sessions(mdsc, skipped),
4579                           ceph_timeout_jiffies(opts->mount_timeout));
4580
4581        /* tear down remaining sessions */
4582        mutex_lock(&mdsc->mutex);
4583        for (i = 0; i < mdsc->max_sessions; i++) {
4584                if (mdsc->sessions[i]) {
4585                        session = ceph_get_mds_session(mdsc->sessions[i]);
4586                        __unregister_session(mdsc, session);
4587                        mutex_unlock(&mdsc->mutex);
4588                        mutex_lock(&session->s_mutex);
4589                        remove_session_caps(session);
4590                        mutex_unlock(&session->s_mutex);
4591                        ceph_put_mds_session(session);
4592                        mutex_lock(&mdsc->mutex);
4593                }
4594        }
4595        WARN_ON(!list_empty(&mdsc->cap_delay_list));
4596        mutex_unlock(&mdsc->mutex);
4597
4598        ceph_cleanup_snapid_map(mdsc);
4599        ceph_cleanup_empty_realms(mdsc);
4600
4601        cancel_work_sync(&mdsc->cap_reclaim_work);
4602        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4603
4604        dout("stopped\n");
4605}
4606
4607void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4608{
4609        struct ceph_mds_session *session;
4610        int mds;
4611
4612        dout("force umount\n");
4613
4614        mutex_lock(&mdsc->mutex);
4615        for (mds = 0; mds < mdsc->max_sessions; mds++) {
4616                session = __ceph_lookup_mds_session(mdsc, mds);
4617                if (!session)
4618                        continue;
4619
4620                if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4621                        __unregister_session(mdsc, session);
4622                __wake_requests(mdsc, &session->s_waiting);
4623                mutex_unlock(&mdsc->mutex);
4624
4625                mutex_lock(&session->s_mutex);
4626                __close_session(mdsc, session);
4627                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4628                        cleanup_session_requests(mdsc, session);
4629                        remove_session_caps(session);
4630                }
4631                mutex_unlock(&session->s_mutex);
4632                ceph_put_mds_session(session);
4633
4634                mutex_lock(&mdsc->mutex);
4635                kick_requests(mdsc, mds);
4636        }
4637        __wake_requests(mdsc, &mdsc->waiting_for_map);
4638        mutex_unlock(&mdsc->mutex);
4639}
4640
4641static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4642{
4643        dout("stop\n");
4644        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4645        if (mdsc->mdsmap)
4646                ceph_mdsmap_destroy(mdsc->mdsmap);
4647        kfree(mdsc->sessions);
4648        ceph_caps_finalize(mdsc);
4649        ceph_pool_perm_destroy(mdsc);
4650}
4651
4652void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4653{
4654        struct ceph_mds_client *mdsc = fsc->mdsc;
4655        dout("mdsc_destroy %p\n", mdsc);
4656
4657        if (!mdsc)
4658                return;
4659
4660        /* flush out any connection work with references to us */
4661        ceph_msgr_flush();
4662
4663        ceph_mdsc_stop(mdsc);
4664
4665        fsc->mdsc = NULL;
4666        kfree(mdsc);
4667        dout("mdsc_destroy %p done\n", mdsc);
4668}
4669
4670void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4671{
4672        struct ceph_fs_client *fsc = mdsc->fsc;
4673        const char *mds_namespace = fsc->mount_options->mds_namespace;
4674        void *p = msg->front.iov_base;
4675        void *end = p + msg->front.iov_len;
4676        u32 epoch;
4677        u32 map_len;
4678        u32 num_fs;
4679        u32 mount_fscid = (u32)-1;
4680        u8 struct_v, struct_cv;
4681        int err = -EINVAL;
4682
4683        ceph_decode_need(&p, end, sizeof(u32), bad);
4684        epoch = ceph_decode_32(&p);
4685
4686        dout("handle_fsmap epoch %u\n", epoch);
4687
4688        ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4689        struct_v = ceph_decode_8(&p);
4690        struct_cv = ceph_decode_8(&p);
4691        map_len = ceph_decode_32(&p);
4692
4693        ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4694        p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4695
4696        num_fs = ceph_decode_32(&p);
4697        while (num_fs-- > 0) {
4698                void *info_p, *info_end;
4699                u32 info_len;
4700                u8 info_v, info_cv;
4701                u32 fscid, namelen;
4702
4703                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4704                info_v = ceph_decode_8(&p);
4705                info_cv = ceph_decode_8(&p);
4706                info_len = ceph_decode_32(&p);
4707                ceph_decode_need(&p, end, info_len, bad);
4708                info_p = p;
4709                info_end = p + info_len;
4710                p = info_end;
4711
4712                ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4713                fscid = ceph_decode_32(&info_p);
4714                namelen = ceph_decode_32(&info_p);
4715                ceph_decode_need(&info_p, info_end, namelen, bad);
4716
4717                if (mds_namespace &&
4718                    strlen(mds_namespace) == namelen &&
4719                    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4720                        mount_fscid = fscid;
4721                        break;
4722                }
4723        }
4724
4725        ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4726        if (mount_fscid != (u32)-1) {
4727                fsc->client->monc.fs_cluster_id = mount_fscid;
4728                ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4729                                   0, true);
4730                ceph_monc_renew_subs(&fsc->client->monc);
4731        } else {
4732                err = -ENOENT;
4733                goto err_out;
4734        }
4735        return;
4736
4737bad:
4738        pr_err("error decoding fsmap\n");
4739err_out:
4740        mutex_lock(&mdsc->mutex);
4741        mdsc->mdsmap_err = err;
4742        __wake_requests(mdsc, &mdsc->waiting_for_map);
4743        mutex_unlock(&mdsc->mutex);
4744}
4745
4746/*
4747 * handle mds map update.
4748 */
4749void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4750{
4751        u32 epoch;
4752        u32 maplen;
4753        void *p = msg->front.iov_base;
4754        void *end = p + msg->front.iov_len;
4755        struct ceph_mdsmap *newmap, *oldmap;
4756        struct ceph_fsid fsid;
4757        int err = -EINVAL;
4758
4759        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4760        ceph_decode_copy(&p, &fsid, sizeof(fsid));
4761        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4762                return;
4763        epoch = ceph_decode_32(&p);
4764        maplen = ceph_decode_32(&p);
4765        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4766
4767        /* do we need it? */
4768        mutex_lock(&mdsc->mutex);
4769        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4770                dout("handle_map epoch %u <= our %u\n",
4771                     epoch, mdsc->mdsmap->m_epoch);
4772                mutex_unlock(&mdsc->mutex);
4773                return;
4774        }
4775
4776        newmap = ceph_mdsmap_decode(&p, end);
4777        if (IS_ERR(newmap)) {
4778                err = PTR_ERR(newmap);
4779                goto bad_unlock;
4780        }
4781
4782        /* swap into place */
4783        if (mdsc->mdsmap) {
4784                oldmap = mdsc->mdsmap;
4785                mdsc->mdsmap = newmap;
4786                check_new_map(mdsc, newmap, oldmap);
4787                ceph_mdsmap_destroy(oldmap);
4788        } else {
4789                mdsc->mdsmap = newmap;  /* first mds map */
4790        }
4791        mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4792                                        MAX_LFS_FILESIZE);
4793
4794        __wake_requests(mdsc, &mdsc->waiting_for_map);
4795        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4796                          mdsc->mdsmap->m_epoch);
4797
4798        mutex_unlock(&mdsc->mutex);
4799        schedule_delayed(mdsc);
4800        return;
4801
4802bad_unlock:
4803        mutex_unlock(&mdsc->mutex);
4804bad:
4805        pr_err("error decoding mdsmap %d\n", err);
4806        return;
4807}
4808
4809static struct ceph_connection *con_get(struct ceph_connection *con)
4810{
4811        struct ceph_mds_session *s = con->private;
4812
4813        if (ceph_get_mds_session(s))
4814                return con;
4815        return NULL;
4816}
4817
4818static void con_put(struct ceph_connection *con)
4819{
4820        struct ceph_mds_session *s = con->private;
4821
4822        ceph_put_mds_session(s);
4823}
4824
4825/*
4826 * if the client is unresponsive for long enough, the mds will kill
4827 * the session entirely.
4828 */
4829static void peer_reset(struct ceph_connection *con)
4830{
4831        struct ceph_mds_session *s = con->private;
4832        struct ceph_mds_client *mdsc = s->s_mdsc;
4833
4834        pr_warn("mds%d closed our session\n", s->s_mds);
4835        send_mds_reconnect(mdsc, s);
4836}
4837
4838static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4839{
4840        struct ceph_mds_session *s = con->private;
4841        struct ceph_mds_client *mdsc = s->s_mdsc;
4842        int type = le16_to_cpu(msg->hdr.type);
4843
4844        mutex_lock(&mdsc->mutex);
4845        if (__verify_registered_session(mdsc, s) < 0) {
4846                mutex_unlock(&mdsc->mutex);
4847                goto out;
4848        }
4849        mutex_unlock(&mdsc->mutex);
4850
4851        switch (type) {
4852        case CEPH_MSG_MDS_MAP:
4853                ceph_mdsc_handle_mdsmap(mdsc, msg);
4854                break;
4855        case CEPH_MSG_FS_MAP_USER:
4856                ceph_mdsc_handle_fsmap(mdsc, msg);
4857                break;
4858        case CEPH_MSG_CLIENT_SESSION:
4859                handle_session(s, msg);
4860                break;
4861        case CEPH_MSG_CLIENT_REPLY:
4862                handle_reply(s, msg);
4863                break;
4864        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4865                handle_forward(mdsc, s, msg);
4866                break;
4867        case CEPH_MSG_CLIENT_CAPS:
4868                ceph_handle_caps(s, msg);
4869                break;
4870        case CEPH_MSG_CLIENT_SNAP:
4871                ceph_handle_snap(mdsc, s, msg);
4872                break;
4873        case CEPH_MSG_CLIENT_LEASE:
4874                handle_lease(mdsc, s, msg);
4875                break;
4876        case CEPH_MSG_CLIENT_QUOTA:
4877                ceph_handle_quota(mdsc, s, msg);
4878                break;
4879
4880        default:
4881                pr_err("received unknown message type %d %s\n", type,
4882                       ceph_msg_type_name(type));
4883        }
4884out:
4885        ceph_msg_put(msg);
4886}
4887
4888/*
4889 * authentication
4890 */
4891
4892/*
4893 * Note: returned pointer is the address of a structure that's
4894 * managed separately.  Caller must *not* attempt to free it.
4895 */
4896static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4897                                        int *proto, int force_new)
4898{
4899        struct ceph_mds_session *s = con->private;
4900        struct ceph_mds_client *mdsc = s->s_mdsc;
4901        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4902        struct ceph_auth_handshake *auth = &s->s_auth;
4903
4904        if (force_new && auth->authorizer) {
4905                ceph_auth_destroy_authorizer(auth->authorizer);
4906                auth->authorizer = NULL;
4907        }
4908        if (!auth->authorizer) {
4909                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4910                                                      auth);
4911                if (ret)
4912                        return ERR_PTR(ret);
4913        } else {
4914                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4915                                                      auth);
4916                if (ret)
4917                        return ERR_PTR(ret);
4918        }
4919        *proto = ac->protocol;
4920
4921        return auth;
4922}
4923
4924static int add_authorizer_challenge(struct ceph_connection *con,
4925                                    void *challenge_buf, int challenge_buf_len)
4926{
4927        struct ceph_mds_session *s = con->private;
4928        struct ceph_mds_client *mdsc = s->s_mdsc;
4929        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4930
4931        return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4932                                            challenge_buf, challenge_buf_len);
4933}
4934
4935static int verify_authorizer_reply(struct ceph_connection *con)
4936{
4937        struct ceph_mds_session *s = con->private;
4938        struct ceph_mds_client *mdsc = s->s_mdsc;
4939        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4940
4941        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4942}
4943
4944static int invalidate_authorizer(struct ceph_connection *con)
4945{
4946        struct ceph_mds_session *s = con->private;
4947        struct ceph_mds_client *mdsc = s->s_mdsc;
4948        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4949
4950        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4951
4952        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4953}
4954
4955static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4956                                struct ceph_msg_header *hdr, int *skip)
4957{
4958        struct ceph_msg *msg;
4959        int type = (int) le16_to_cpu(hdr->type);
4960        int front_len = (int) le32_to_cpu(hdr->front_len);
4961
4962        if (con->in_msg)
4963                return con->in_msg;
4964
4965        *skip = 0;
4966        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4967        if (!msg) {
4968                pr_err("unable to allocate msg type %d len %d\n",
4969                       type, front_len);
4970                return NULL;
4971        }
4972
4973        return msg;
4974}
4975
4976static int mds_sign_message(struct ceph_msg *msg)
4977{
4978       struct ceph_mds_session *s = msg->con->private;
4979       struct ceph_auth_handshake *auth = &s->s_auth;
4980
4981       return ceph_auth_sign_message(auth, msg);
4982}
4983
4984static int mds_check_message_signature(struct ceph_msg *msg)
4985{
4986       struct ceph_mds_session *s = msg->con->private;
4987       struct ceph_auth_handshake *auth = &s->s_auth;
4988
4989       return ceph_auth_check_message_signature(auth, msg);
4990}
4991
4992static const struct ceph_connection_operations mds_con_ops = {
4993        .get = con_get,
4994        .put = con_put,
4995        .dispatch = dispatch,
4996        .get_authorizer = get_authorizer,
4997        .add_authorizer_challenge = add_authorizer_challenge,
4998        .verify_authorizer_reply = verify_authorizer_reply,
4999        .invalidate_authorizer = invalidate_authorizer,
5000        .peer_reset = peer_reset,
5001        .alloc_msg = mds_alloc_msg,
5002        .sign_message = mds_sign_message,
5003        .check_message_signature = mds_check_message_signature,
5004};
5005
5006/* eof */
5007