linux/net/ceph/mon_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/types.h>
   5#include <linux/slab.h>
   6#include <linux/random.h>
   7#include <linux/sched.h>
   8
   9#include <linux/ceph/mon_client.h>
  10#include <linux/ceph/libceph.h>
  11#include <linux/ceph/debugfs.h>
  12#include <linux/ceph/decode.h>
  13#include <linux/ceph/auth.h>
  14
  15/*
  16 * Interact with Ceph monitor cluster.  Handle requests for new map
  17 * versions, and periodically resend as needed.  Also implement
  18 * statfs() and umount().
  19 *
  20 * A small cluster of Ceph "monitors" are responsible for managing critical
  21 * cluster configuration and state information.  An odd number (e.g., 3, 5)
  22 * of cmon daemons use a modified version of the Paxos part-time parliament
  23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  24 * list of clients who have mounted the file system.
  25 *
  26 * We maintain an open, active session with a monitor at all times in order to
  27 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  28 * TCP socket to ensure we detect a failure.  If the connection does break, we
  29 * randomly hunt for a new monitor.  Once the connection is reestablished, we
  30 * resend any outstanding requests.
  31 */
  32
  33static const struct ceph_connection_operations mon_con_ops;
  34
  35static int __validate_auth(struct ceph_mon_client *monc);
  36
  37/*
  38 * Decode a monmap blob (e.g., during mount).
  39 */
  40struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  41{
  42        struct ceph_monmap *m = NULL;
  43        int i, err = -EINVAL;
  44        struct ceph_fsid fsid;
  45        u32 epoch, num_mon;
  46        u16 version;
  47        u32 len;
  48
  49        ceph_decode_32_safe(&p, end, len, bad);
  50        ceph_decode_need(&p, end, len, bad);
  51
  52        dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  53
  54        ceph_decode_16_safe(&p, end, version, bad);
  55
  56        ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  57        ceph_decode_copy(&p, &fsid, sizeof(fsid));
  58        epoch = ceph_decode_32(&p);
  59
  60        num_mon = ceph_decode_32(&p);
  61        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  62
  63        if (num_mon >= CEPH_MAX_MON)
  64                goto bad;
  65        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  66        if (m == NULL)
  67                return ERR_PTR(-ENOMEM);
  68        m->fsid = fsid;
  69        m->epoch = epoch;
  70        m->num_mon = num_mon;
  71        ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  72        for (i = 0; i < num_mon; i++)
  73                ceph_decode_addr(&m->mon_inst[i].addr);
  74
  75        dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  76             m->num_mon);
  77        for (i = 0; i < m->num_mon; i++)
  78                dout("monmap_decode  mon%d is %s\n", i,
  79                     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  80        return m;
  81
  82bad:
  83        dout("monmap_decode failed with %d\n", err);
  84        kfree(m);
  85        return ERR_PTR(err);
  86}
  87
  88/*
  89 * return true if *addr is included in the monmap.
  90 */
  91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  92{
  93        int i;
  94
  95        for (i = 0; i < m->num_mon; i++)
  96                if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  97                        return 1;
  98        return 0;
  99}
 100
 101/*
 102 * Send an auth request.
 103 */
 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 105{
 106        monc->pending_auth = 1;
 107        monc->m_auth->front.iov_len = len;
 108        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 109        ceph_msg_revoke(monc->m_auth);
 110        ceph_msg_get(monc->m_auth);  /* keep our ref */
 111        ceph_con_send(&monc->con, monc->m_auth);
 112}
 113
 114/*
 115 * Close monitor session, if any.
 116 */
 117static void __close_session(struct ceph_mon_client *monc)
 118{
 119        dout("__close_session closing mon%d\n", monc->cur_mon);
 120        ceph_msg_revoke(monc->m_auth);
 121        ceph_msg_revoke_incoming(monc->m_auth_reply);
 122        ceph_msg_revoke(monc->m_subscribe);
 123        ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 124        ceph_con_close(&monc->con);
 125
 126        monc->pending_auth = 0;
 127        ceph_auth_reset(monc->auth);
 128}
 129
 130/*
 131 * Pick a new monitor at random and set cur_mon.  If we are repicking
 132 * (i.e. cur_mon is already set), be sure to pick a different one.
 133 */
 134static void pick_new_mon(struct ceph_mon_client *monc)
 135{
 136        int old_mon = monc->cur_mon;
 137
 138        BUG_ON(monc->monmap->num_mon < 1);
 139
 140        if (monc->monmap->num_mon == 1) {
 141                monc->cur_mon = 0;
 142        } else {
 143                int max = monc->monmap->num_mon;
 144                int o = -1;
 145                int n;
 146
 147                if (monc->cur_mon >= 0) {
 148                        if (monc->cur_mon < monc->monmap->num_mon)
 149                                o = monc->cur_mon;
 150                        if (o >= 0)
 151                                max--;
 152                }
 153
 154                n = prandom_u32() % max;
 155                if (o >= 0 && n >= o)
 156                        n++;
 157
 158                monc->cur_mon = n;
 159        }
 160
 161        dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
 162             monc->cur_mon, monc->monmap->num_mon);
 163}
 164
 165/*
 166 * Open a session with a new monitor.
 167 */
 168static void __open_session(struct ceph_mon_client *monc)
 169{
 170        int ret;
 171
 172        pick_new_mon(monc);
 173
 174        monc->hunting = true;
 175        if (monc->had_a_connection) {
 176                monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
 177                if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
 178                        monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
 179        }
 180
 181        monc->sub_renew_after = jiffies; /* i.e., expired */
 182        monc->sub_renew_sent = 0;
 183
 184        dout("%s opening mon%d\n", __func__, monc->cur_mon);
 185        ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 186                      &monc->monmap->mon_inst[monc->cur_mon].addr);
 187
 188        /*
 189         * send an initial keepalive to ensure our timestamp is valid
 190         * by the time we are in an OPENED state
 191         */
 192        ceph_con_keepalive(&monc->con);
 193
 194        /* initiate authentication handshake */
 195        ret = ceph_auth_build_hello(monc->auth,
 196                                    monc->m_auth->front.iov_base,
 197                                    monc->m_auth->front_alloc_len);
 198        BUG_ON(ret <= 0);
 199        __send_prepared_auth_request(monc, ret);
 200}
 201
 202static void reopen_session(struct ceph_mon_client *monc)
 203{
 204        if (!monc->hunting)
 205                pr_info("mon%d %s session lost, hunting for new mon\n",
 206                    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
 207
 208        __close_session(monc);
 209        __open_session(monc);
 210}
 211
 212/*
 213 * Reschedule delayed work timer.
 214 */
 215static void __schedule_delayed(struct ceph_mon_client *monc)
 216{
 217        unsigned long delay;
 218
 219        if (monc->hunting)
 220                delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
 221        else
 222                delay = CEPH_MONC_PING_INTERVAL;
 223
 224        dout("__schedule_delayed after %lu\n", delay);
 225        mod_delayed_work(system_wq, &monc->delayed_work,
 226                         round_jiffies_relative(delay));
 227}
 228
 229const char *ceph_sub_str[] = {
 230        [CEPH_SUB_MDSMAP] = "mdsmap",
 231        [CEPH_SUB_MONMAP] = "monmap",
 232        [CEPH_SUB_OSDMAP] = "osdmap",
 233};
 234
 235/*
 236 * Send subscribe request for one or more maps, according to
 237 * monc->subs.
 238 */
 239static void __send_subscribe(struct ceph_mon_client *monc)
 240{
 241        struct ceph_msg *msg = monc->m_subscribe;
 242        void *p = msg->front.iov_base;
 243        void *const end = p + msg->front_alloc_len;
 244        int num = 0;
 245        int i;
 246
 247        dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 248
 249        BUG_ON(monc->cur_mon < 0);
 250
 251        if (!monc->sub_renew_sent)
 252                monc->sub_renew_sent = jiffies | 1; /* never 0 */
 253
 254        msg->hdr.version = cpu_to_le16(2);
 255
 256        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 257                if (monc->subs[i].want)
 258                        num++;
 259        }
 260        BUG_ON(num < 1); /* monmap sub is always there */
 261        ceph_encode_32(&p, num);
 262        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 263                const char *s = ceph_sub_str[i];
 264
 265                if (!monc->subs[i].want)
 266                        continue;
 267
 268                dout("%s %s start %llu flags 0x%x\n", __func__, s,
 269                     le64_to_cpu(monc->subs[i].item.start),
 270                     monc->subs[i].item.flags);
 271                ceph_encode_string(&p, end, s, strlen(s));
 272                memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
 273                p += sizeof(monc->subs[i].item);
 274        }
 275
 276        BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
 277        msg->front.iov_len = p - msg->front.iov_base;
 278        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 279        ceph_msg_revoke(msg);
 280        ceph_con_send(&monc->con, ceph_msg_get(msg));
 281}
 282
 283static void handle_subscribe_ack(struct ceph_mon_client *monc,
 284                                 struct ceph_msg *msg)
 285{
 286        unsigned int seconds;
 287        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 288
 289        if (msg->front.iov_len < sizeof(*h))
 290                goto bad;
 291        seconds = le32_to_cpu(h->duration);
 292
 293        mutex_lock(&monc->mutex);
 294        if (monc->sub_renew_sent) {
 295                monc->sub_renew_after = monc->sub_renew_sent +
 296                                            (seconds >> 1) * HZ - 1;
 297                dout("%s sent %lu duration %d renew after %lu\n", __func__,
 298                     monc->sub_renew_sent, seconds, monc->sub_renew_after);
 299                monc->sub_renew_sent = 0;
 300        } else {
 301                dout("%s sent %lu renew after %lu, ignoring\n", __func__,
 302                     monc->sub_renew_sent, monc->sub_renew_after);
 303        }
 304        mutex_unlock(&monc->mutex);
 305        return;
 306bad:
 307        pr_err("got corrupt subscribe-ack msg\n");
 308        ceph_msg_dump(msg);
 309}
 310
 311/*
 312 * Register interest in a map
 313 *
 314 * @sub: one of CEPH_SUB_*
 315 * @epoch: X for "every map since X", or 0 for "just the latest"
 316 */
 317static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
 318                                 u32 epoch, bool continuous)
 319{
 320        __le64 start = cpu_to_le64(epoch);
 321        u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 322
 323        dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
 324             epoch, continuous);
 325
 326        if (monc->subs[sub].want &&
 327            monc->subs[sub].item.start == start &&
 328            monc->subs[sub].item.flags == flags)
 329                return false;
 330
 331        monc->subs[sub].item.start = start;
 332        monc->subs[sub].item.flags = flags;
 333        monc->subs[sub].want = true;
 334
 335        return true;
 336}
 337
 338bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
 339                        bool continuous)
 340{
 341        bool need_request;
 342
 343        mutex_lock(&monc->mutex);
 344        need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
 345        mutex_unlock(&monc->mutex);
 346
 347        return need_request;
 348}
 349EXPORT_SYMBOL(ceph_monc_want_map);
 350
 351/*
 352 * Keep track of which maps we have
 353 *
 354 * @sub: one of CEPH_SUB_*
 355 */
 356static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
 357                                u32 epoch)
 358{
 359        dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
 360
 361        if (monc->subs[sub].want) {
 362                if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
 363                        monc->subs[sub].want = false;
 364                else
 365                        monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
 366        }
 367
 368        monc->subs[sub].have = epoch;
 369}
 370
 371void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 372{
 373        mutex_lock(&monc->mutex);
 374        __ceph_monc_got_map(monc, sub, epoch);
 375        mutex_unlock(&monc->mutex);
 376}
 377EXPORT_SYMBOL(ceph_monc_got_map);
 378
 379/*
 380 * Register interest in the next osdmap
 381 */
 382void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 383{
 384        dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
 385        mutex_lock(&monc->mutex);
 386        if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
 387                                 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
 388                __send_subscribe(monc);
 389        mutex_unlock(&monc->mutex);
 390}
 391EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
 392
 393/*
 394 * Wait for an osdmap with a given epoch.
 395 *
 396 * @epoch: epoch to wait for
 397 * @timeout: in jiffies, 0 means "wait forever"
 398 */
 399int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 400                          unsigned long timeout)
 401{
 402        unsigned long started = jiffies;
 403        long ret;
 404
 405        mutex_lock(&monc->mutex);
 406        while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 407                mutex_unlock(&monc->mutex);
 408
 409                if (timeout && time_after_eq(jiffies, started + timeout))
 410                        return -ETIMEDOUT;
 411
 412                ret = wait_event_interruptible_timeout(monc->client->auth_wq,
 413                                     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
 414                                     ceph_timeout_jiffies(timeout));
 415                if (ret < 0)
 416                        return ret;
 417
 418                mutex_lock(&monc->mutex);
 419        }
 420
 421        mutex_unlock(&monc->mutex);
 422        return 0;
 423}
 424EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 425
 426/*
 427 * Open a session with a random monitor.  Request monmap and osdmap,
 428 * which are waited upon in __ceph_open_session().
 429 */
 430int ceph_monc_open_session(struct ceph_mon_client *monc)
 431{
 432        mutex_lock(&monc->mutex);
 433        __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
 434        __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 435        __open_session(monc);
 436        __schedule_delayed(monc);
 437        mutex_unlock(&monc->mutex);
 438        return 0;
 439}
 440EXPORT_SYMBOL(ceph_monc_open_session);
 441
 442static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 443                                 struct ceph_msg *msg)
 444{
 445        struct ceph_client *client = monc->client;
 446        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 447        void *p, *end;
 448
 449        mutex_lock(&monc->mutex);
 450
 451        dout("handle_monmap\n");
 452        p = msg->front.iov_base;
 453        end = p + msg->front.iov_len;
 454
 455        monmap = ceph_monmap_decode(p, end);
 456        if (IS_ERR(monmap)) {
 457                pr_err("problem decoding monmap, %d\n",
 458                       (int)PTR_ERR(monmap));
 459                goto out;
 460        }
 461
 462        if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 463                kfree(monmap);
 464                goto out;
 465        }
 466
 467        client->monc.monmap = monmap;
 468        kfree(old);
 469
 470        __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 471        client->have_fsid = true;
 472
 473out:
 474        mutex_unlock(&monc->mutex);
 475        wake_up_all(&client->auth_wq);
 476}
 477
 478/*
 479 * generic requests (currently statfs, mon_get_version)
 480 */
 481static struct ceph_mon_generic_request *__lookup_generic_req(
 482        struct ceph_mon_client *monc, u64 tid)
 483{
 484        struct ceph_mon_generic_request *req;
 485        struct rb_node *n = monc->generic_request_tree.rb_node;
 486
 487        while (n) {
 488                req = rb_entry(n, struct ceph_mon_generic_request, node);
 489                if (tid < req->tid)
 490                        n = n->rb_left;
 491                else if (tid > req->tid)
 492                        n = n->rb_right;
 493                else
 494                        return req;
 495        }
 496        return NULL;
 497}
 498
 499static void __insert_generic_request(struct ceph_mon_client *monc,
 500                            struct ceph_mon_generic_request *new)
 501{
 502        struct rb_node **p = &monc->generic_request_tree.rb_node;
 503        struct rb_node *parent = NULL;
 504        struct ceph_mon_generic_request *req = NULL;
 505
 506        while (*p) {
 507                parent = *p;
 508                req = rb_entry(parent, struct ceph_mon_generic_request, node);
 509                if (new->tid < req->tid)
 510                        p = &(*p)->rb_left;
 511                else if (new->tid > req->tid)
 512                        p = &(*p)->rb_right;
 513                else
 514                        BUG();
 515        }
 516
 517        rb_link_node(&new->node, parent, p);
 518        rb_insert_color(&new->node, &monc->generic_request_tree);
 519}
 520
 521static void release_generic_request(struct kref *kref)
 522{
 523        struct ceph_mon_generic_request *req =
 524                container_of(kref, struct ceph_mon_generic_request, kref);
 525
 526        if (req->reply)
 527                ceph_msg_put(req->reply);
 528        if (req->request)
 529                ceph_msg_put(req->request);
 530
 531        kfree(req);
 532}
 533
 534static void put_generic_request(struct ceph_mon_generic_request *req)
 535{
 536        kref_put(&req->kref, release_generic_request);
 537}
 538
 539static void get_generic_request(struct ceph_mon_generic_request *req)
 540{
 541        kref_get(&req->kref);
 542}
 543
 544static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 545                                         struct ceph_msg_header *hdr,
 546                                         int *skip)
 547{
 548        struct ceph_mon_client *monc = con->private;
 549        struct ceph_mon_generic_request *req;
 550        u64 tid = le64_to_cpu(hdr->tid);
 551        struct ceph_msg *m;
 552
 553        mutex_lock(&monc->mutex);
 554        req = __lookup_generic_req(monc, tid);
 555        if (!req) {
 556                dout("get_generic_reply %lld dne\n", tid);
 557                *skip = 1;
 558                m = NULL;
 559        } else {
 560                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 561                *skip = 0;
 562                m = ceph_msg_get(req->reply);
 563                /*
 564                 * we don't need to track the connection reading into
 565                 * this reply because we only have one open connection
 566                 * at a time, ever.
 567                 */
 568        }
 569        mutex_unlock(&monc->mutex);
 570        return m;
 571}
 572
 573static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
 574                                struct ceph_mon_generic_request *req)
 575{
 576        int err;
 577
 578        /* register request */
 579        req->tid = tid != 0 ? tid : ++monc->last_tid;
 580        req->request->hdr.tid = cpu_to_le64(req->tid);
 581        __insert_generic_request(monc, req);
 582        monc->num_generic_requests++;
 583        ceph_con_send(&monc->con, ceph_msg_get(req->request));
 584        mutex_unlock(&monc->mutex);
 585
 586        err = wait_for_completion_interruptible(&req->completion);
 587
 588        mutex_lock(&monc->mutex);
 589        rb_erase(&req->node, &monc->generic_request_tree);
 590        monc->num_generic_requests--;
 591
 592        if (!err)
 593                err = req->result;
 594        return err;
 595}
 596
 597static int do_generic_request(struct ceph_mon_client *monc,
 598                              struct ceph_mon_generic_request *req)
 599{
 600        int err;
 601
 602        mutex_lock(&monc->mutex);
 603        err = __do_generic_request(monc, 0, req);
 604        mutex_unlock(&monc->mutex);
 605
 606        return err;
 607}
 608
 609/*
 610 * statfs
 611 */
 612static void handle_statfs_reply(struct ceph_mon_client *monc,
 613                                struct ceph_msg *msg)
 614{
 615        struct ceph_mon_generic_request *req;
 616        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 617        u64 tid = le64_to_cpu(msg->hdr.tid);
 618
 619        if (msg->front.iov_len != sizeof(*reply))
 620                goto bad;
 621        dout("handle_statfs_reply %p tid %llu\n", msg, tid);
 622
 623        mutex_lock(&monc->mutex);
 624        req = __lookup_generic_req(monc, tid);
 625        if (req) {
 626                *(struct ceph_statfs *)req->buf = reply->st;
 627                req->result = 0;
 628                get_generic_request(req);
 629        }
 630        mutex_unlock(&monc->mutex);
 631        if (req) {
 632                complete_all(&req->completion);
 633                put_generic_request(req);
 634        }
 635        return;
 636
 637bad:
 638        pr_err("corrupt statfs reply, tid %llu\n", tid);
 639        ceph_msg_dump(msg);
 640}
 641
 642/*
 643 * Do a synchronous statfs().
 644 */
 645int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 646{
 647        struct ceph_mon_generic_request *req;
 648        struct ceph_mon_statfs *h;
 649        int err;
 650
 651        req = kzalloc(sizeof(*req), GFP_NOFS);
 652        if (!req)
 653                return -ENOMEM;
 654
 655        kref_init(&req->kref);
 656        req->buf = buf;
 657        init_completion(&req->completion);
 658
 659        err = -ENOMEM;
 660        req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
 661                                    true);
 662        if (!req->request)
 663                goto out;
 664        req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
 665                                  true);
 666        if (!req->reply)
 667                goto out;
 668
 669        /* fill out request */
 670        h = req->request->front.iov_base;
 671        h->monhdr.have_version = 0;
 672        h->monhdr.session_mon = cpu_to_le16(-1);
 673        h->monhdr.session_mon_tid = 0;
 674        h->fsid = monc->monmap->fsid;
 675
 676        err = do_generic_request(monc, req);
 677
 678out:
 679        put_generic_request(req);
 680        return err;
 681}
 682EXPORT_SYMBOL(ceph_monc_do_statfs);
 683
 684static void handle_get_version_reply(struct ceph_mon_client *monc,
 685                                     struct ceph_msg *msg)
 686{
 687        struct ceph_mon_generic_request *req;
 688        u64 tid = le64_to_cpu(msg->hdr.tid);
 689        void *p = msg->front.iov_base;
 690        void *end = p + msg->front_alloc_len;
 691        u64 handle;
 692
 693        dout("%s %p tid %llu\n", __func__, msg, tid);
 694
 695        ceph_decode_need(&p, end, 2*sizeof(u64), bad);
 696        handle = ceph_decode_64(&p);
 697        if (tid != 0 && tid != handle)
 698                goto bad;
 699
 700        mutex_lock(&monc->mutex);
 701        req = __lookup_generic_req(monc, handle);
 702        if (req) {
 703                *(u64 *)req->buf = ceph_decode_64(&p);
 704                req->result = 0;
 705                get_generic_request(req);
 706        }
 707        mutex_unlock(&monc->mutex);
 708        if (req) {
 709                complete_all(&req->completion);
 710                put_generic_request(req);
 711        }
 712
 713        return;
 714bad:
 715        pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
 716        ceph_msg_dump(msg);
 717}
 718
 719/*
 720 * Send MMonGetVersion and wait for the reply.
 721 *
 722 * @what: one of "mdsmap", "osdmap" or "monmap"
 723 */
 724int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
 725                             u64 *newest)
 726{
 727        struct ceph_mon_generic_request *req;
 728        void *p, *end;
 729        u64 tid;
 730        int err;
 731
 732        req = kzalloc(sizeof(*req), GFP_NOFS);
 733        if (!req)
 734                return -ENOMEM;
 735
 736        kref_init(&req->kref);
 737        req->buf = newest;
 738        init_completion(&req->completion);
 739
 740        req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
 741                                    sizeof(u64) + sizeof(u32) + strlen(what),
 742                                    GFP_NOFS, true);
 743        if (!req->request) {
 744                err = -ENOMEM;
 745                goto out;
 746        }
 747
 748        req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
 749                                  GFP_NOFS, true);
 750        if (!req->reply) {
 751                err = -ENOMEM;
 752                goto out;
 753        }
 754
 755        p = req->request->front.iov_base;
 756        end = p + req->request->front_alloc_len;
 757
 758        /* fill out request */
 759        mutex_lock(&monc->mutex);
 760        tid = ++monc->last_tid;
 761        ceph_encode_64(&p, tid); /* handle */
 762        ceph_encode_string(&p, end, what, strlen(what));
 763
 764        err = __do_generic_request(monc, tid, req);
 765
 766        mutex_unlock(&monc->mutex);
 767out:
 768        put_generic_request(req);
 769        return err;
 770}
 771EXPORT_SYMBOL(ceph_monc_do_get_version);
 772
 773/*
 774 * Resend pending generic requests.
 775 */
 776static void __resend_generic_request(struct ceph_mon_client *monc)
 777{
 778        struct ceph_mon_generic_request *req;
 779        struct rb_node *p;
 780
 781        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 782                req = rb_entry(p, struct ceph_mon_generic_request, node);
 783                ceph_msg_revoke(req->request);
 784                ceph_msg_revoke_incoming(req->reply);
 785                ceph_con_send(&monc->con, ceph_msg_get(req->request));
 786        }
 787}
 788
 789/*
 790 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 791 * renew/retry subscription as needed (in case it is timing out, or we
 792 * got an ENOMEM).  And keep the monitor connection alive.
 793 */
 794static void delayed_work(struct work_struct *work)
 795{
 796        struct ceph_mon_client *monc =
 797                container_of(work, struct ceph_mon_client, delayed_work.work);
 798
 799        dout("monc delayed_work\n");
 800        mutex_lock(&monc->mutex);
 801        if (monc->hunting) {
 802                dout("%s continuing hunt\n", __func__);
 803                reopen_session(monc);
 804        } else {
 805                int is_auth = ceph_auth_is_authenticated(monc->auth);
 806                if (ceph_con_keepalive_expired(&monc->con,
 807                                               CEPH_MONC_PING_TIMEOUT)) {
 808                        dout("monc keepalive timeout\n");
 809                        is_auth = 0;
 810                        reopen_session(monc);
 811                }
 812
 813                if (!monc->hunting) {
 814                        ceph_con_keepalive(&monc->con);
 815                        __validate_auth(monc);
 816                }
 817
 818                if (is_auth) {
 819                        unsigned long now = jiffies;
 820
 821                        dout("%s renew subs? now %lu renew after %lu\n",
 822                             __func__, now, monc->sub_renew_after);
 823                        if (time_after_eq(now, monc->sub_renew_after))
 824                                __send_subscribe(monc);
 825                }
 826        }
 827        __schedule_delayed(monc);
 828        mutex_unlock(&monc->mutex);
 829}
 830
 831/*
 832 * On startup, we build a temporary monmap populated with the IPs
 833 * provided by mount(2).
 834 */
 835static int build_initial_monmap(struct ceph_mon_client *monc)
 836{
 837        struct ceph_options *opt = monc->client->options;
 838        struct ceph_entity_addr *mon_addr = opt->mon_addr;
 839        int num_mon = opt->num_mon;
 840        int i;
 841
 842        /* build initial monmap */
 843        monc->monmap = kzalloc(sizeof(*monc->monmap) +
 844                               num_mon*sizeof(monc->monmap->mon_inst[0]),
 845                               GFP_KERNEL);
 846        if (!monc->monmap)
 847                return -ENOMEM;
 848        for (i = 0; i < num_mon; i++) {
 849                monc->monmap->mon_inst[i].addr = mon_addr[i];
 850                monc->monmap->mon_inst[i].addr.nonce = 0;
 851                monc->monmap->mon_inst[i].name.type =
 852                        CEPH_ENTITY_TYPE_MON;
 853                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
 854        }
 855        monc->monmap->num_mon = num_mon;
 856        return 0;
 857}
 858
 859int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 860{
 861        int err = 0;
 862
 863        dout("init\n");
 864        memset(monc, 0, sizeof(*monc));
 865        monc->client = cl;
 866        monc->monmap = NULL;
 867        mutex_init(&monc->mutex);
 868
 869        err = build_initial_monmap(monc);
 870        if (err)
 871                goto out;
 872
 873        /* connection */
 874        /* authentication */
 875        monc->auth = ceph_auth_init(cl->options->name,
 876                                    cl->options->key);
 877        if (IS_ERR(monc->auth)) {
 878                err = PTR_ERR(monc->auth);
 879                goto out_monmap;
 880        }
 881        monc->auth->want_keys =
 882                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
 883                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
 884
 885        /* msgs */
 886        err = -ENOMEM;
 887        monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
 888                                     sizeof(struct ceph_mon_subscribe_ack),
 889                                     GFP_NOFS, true);
 890        if (!monc->m_subscribe_ack)
 891                goto out_auth;
 892
 893        monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
 894                                         true);
 895        if (!monc->m_subscribe)
 896                goto out_subscribe_ack;
 897
 898        monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
 899                                          true);
 900        if (!monc->m_auth_reply)
 901                goto out_subscribe;
 902
 903        monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
 904        monc->pending_auth = 0;
 905        if (!monc->m_auth)
 906                goto out_auth_reply;
 907
 908        ceph_con_init(&monc->con, monc, &mon_con_ops,
 909                      &monc->client->msgr);
 910
 911        monc->cur_mon = -1;
 912        monc->had_a_connection = false;
 913        monc->hunt_mult = 1;
 914
 915        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 916        monc->generic_request_tree = RB_ROOT;
 917        monc->num_generic_requests = 0;
 918        monc->last_tid = 0;
 919
 920        return 0;
 921
 922out_auth_reply:
 923        ceph_msg_put(monc->m_auth_reply);
 924out_subscribe:
 925        ceph_msg_put(monc->m_subscribe);
 926out_subscribe_ack:
 927        ceph_msg_put(monc->m_subscribe_ack);
 928out_auth:
 929        ceph_auth_destroy(monc->auth);
 930out_monmap:
 931        kfree(monc->monmap);
 932out:
 933        return err;
 934}
 935EXPORT_SYMBOL(ceph_monc_init);
 936
 937void ceph_monc_stop(struct ceph_mon_client *monc)
 938{
 939        dout("stop\n");
 940        cancel_delayed_work_sync(&monc->delayed_work);
 941
 942        mutex_lock(&monc->mutex);
 943        __close_session(monc);
 944        monc->cur_mon = -1;
 945        mutex_unlock(&monc->mutex);
 946
 947        /*
 948         * flush msgr queue before we destroy ourselves to ensure that:
 949         *  - any work that references our embedded con is finished.
 950         *  - any osd_client or other work that may reference an authorizer
 951         *    finishes before we shut down the auth subsystem.
 952         */
 953        ceph_msgr_flush();
 954
 955        ceph_auth_destroy(monc->auth);
 956
 957        ceph_msg_put(monc->m_auth);
 958        ceph_msg_put(monc->m_auth_reply);
 959        ceph_msg_put(monc->m_subscribe);
 960        ceph_msg_put(monc->m_subscribe_ack);
 961
 962        kfree(monc->monmap);
 963}
 964EXPORT_SYMBOL(ceph_monc_stop);
 965
 966static void finish_hunting(struct ceph_mon_client *monc)
 967{
 968        if (monc->hunting) {
 969                dout("%s found mon%d\n", __func__, monc->cur_mon);
 970                monc->hunting = false;
 971                monc->had_a_connection = true;
 972                monc->hunt_mult /= 2; /* reduce by 50% */
 973                if (monc->hunt_mult < 1)
 974                        monc->hunt_mult = 1;
 975        }
 976}
 977
 978static void handle_auth_reply(struct ceph_mon_client *monc,
 979                              struct ceph_msg *msg)
 980{
 981        int ret;
 982        int was_auth = 0;
 983
 984        mutex_lock(&monc->mutex);
 985        was_auth = ceph_auth_is_authenticated(monc->auth);
 986        monc->pending_auth = 0;
 987        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 988                                     msg->front.iov_len,
 989                                     monc->m_auth->front.iov_base,
 990                                     monc->m_auth->front_alloc_len);
 991        if (ret > 0) {
 992                __send_prepared_auth_request(monc, ret);
 993                goto out;
 994        }
 995
 996        finish_hunting(monc);
 997
 998        if (ret < 0) {
 999                monc->client->auth_err = ret;
1000        } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1001                dout("authenticated, starting session\n");
1002
1003                monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1004                monc->client->msgr.inst.name.num =
1005                                        cpu_to_le64(monc->auth->global_id);
1006
1007                __send_subscribe(monc);
1008                __resend_generic_request(monc);
1009
1010                pr_info("mon%d %s session established\n", monc->cur_mon,
1011                        ceph_pr_addr(&monc->con.peer_addr.in_addr));
1012        }
1013
1014out:
1015        mutex_unlock(&monc->mutex);
1016        if (monc->client->auth_err < 0)
1017                wake_up_all(&monc->client->auth_wq);
1018}
1019
1020static int __validate_auth(struct ceph_mon_client *monc)
1021{
1022        int ret;
1023
1024        if (monc->pending_auth)
1025                return 0;
1026
1027        ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1028                              monc->m_auth->front_alloc_len);
1029        if (ret <= 0)
1030                return ret; /* either an error, or no need to authenticate */
1031        __send_prepared_auth_request(monc, ret);
1032        return 0;
1033}
1034
1035int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1036{
1037        int ret;
1038
1039        mutex_lock(&monc->mutex);
1040        ret = __validate_auth(monc);
1041        mutex_unlock(&monc->mutex);
1042        return ret;
1043}
1044EXPORT_SYMBOL(ceph_monc_validate_auth);
1045
1046/*
1047 * handle incoming message
1048 */
1049static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1050{
1051        struct ceph_mon_client *monc = con->private;
1052        int type = le16_to_cpu(msg->hdr.type);
1053
1054        if (!monc)
1055                return;
1056
1057        switch (type) {
1058        case CEPH_MSG_AUTH_REPLY:
1059                handle_auth_reply(monc, msg);
1060                break;
1061
1062        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1063                handle_subscribe_ack(monc, msg);
1064                break;
1065
1066        case CEPH_MSG_STATFS_REPLY:
1067                handle_statfs_reply(monc, msg);
1068                break;
1069
1070        case CEPH_MSG_MON_GET_VERSION_REPLY:
1071                handle_get_version_reply(monc, msg);
1072                break;
1073
1074        case CEPH_MSG_MON_MAP:
1075                ceph_monc_handle_map(monc, msg);
1076                break;
1077
1078        case CEPH_MSG_OSD_MAP:
1079                ceph_osdc_handle_map(&monc->client->osdc, msg);
1080                break;
1081
1082        default:
1083                /* can the chained handler handle it? */
1084                if (monc->client->extra_mon_dispatch &&
1085                    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1086                        break;
1087                        
1088                pr_err("received unknown message type %d %s\n", type,
1089                       ceph_msg_type_name(type));
1090        }
1091        ceph_msg_put(msg);
1092}
1093
1094/*
1095 * Allocate memory for incoming message
1096 */
1097static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1098                                      struct ceph_msg_header *hdr,
1099                                      int *skip)
1100{
1101        struct ceph_mon_client *monc = con->private;
1102        int type = le16_to_cpu(hdr->type);
1103        int front_len = le32_to_cpu(hdr->front_len);
1104        struct ceph_msg *m = NULL;
1105
1106        *skip = 0;
1107
1108        switch (type) {
1109        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1110                m = ceph_msg_get(monc->m_subscribe_ack);
1111                break;
1112        case CEPH_MSG_STATFS_REPLY:
1113                return get_generic_reply(con, hdr, skip);
1114        case CEPH_MSG_AUTH_REPLY:
1115                m = ceph_msg_get(monc->m_auth_reply);
1116                break;
1117        case CEPH_MSG_MON_GET_VERSION_REPLY:
1118                if (le64_to_cpu(hdr->tid) != 0)
1119                        return get_generic_reply(con, hdr, skip);
1120
1121                /*
1122                 * Older OSDs don't set reply tid even if the orignal
1123                 * request had a non-zero tid.  Workaround this weirdness
1124                 * by falling through to the allocate case.
1125                 */
1126        case CEPH_MSG_MON_MAP:
1127        case CEPH_MSG_MDS_MAP:
1128        case CEPH_MSG_OSD_MAP:
1129                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1130                if (!m)
1131                        return NULL;    /* ENOMEM--return skip == 0 */
1132                break;
1133        }
1134
1135        if (!m) {
1136                pr_info("alloc_msg unknown type %d\n", type);
1137                *skip = 1;
1138        } else if (front_len > m->front_alloc_len) {
1139                pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1140                        front_len, m->front_alloc_len,
1141                        (unsigned int)con->peer_name.type,
1142                        le64_to_cpu(con->peer_name.num));
1143                ceph_msg_put(m);
1144                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1145        }
1146
1147        return m;
1148}
1149
1150/*
1151 * If the monitor connection resets, pick a new monitor and resubmit
1152 * any pending requests.
1153 */
1154static void mon_fault(struct ceph_connection *con)
1155{
1156        struct ceph_mon_client *monc = con->private;
1157
1158        mutex_lock(&monc->mutex);
1159        dout("%s mon%d\n", __func__, monc->cur_mon);
1160        if (monc->cur_mon >= 0) {
1161                if (!monc->hunting) {
1162                        dout("%s hunting for new mon\n", __func__);
1163                        reopen_session(monc);
1164                        __schedule_delayed(monc);
1165                } else {
1166                        dout("%s already hunting\n", __func__);
1167                }
1168        }
1169        mutex_unlock(&monc->mutex);
1170}
1171
1172/*
1173 * We can ignore refcounting on the connection struct, as all references
1174 * will come from the messenger workqueue, which is drained prior to
1175 * mon_client destruction.
1176 */
1177static struct ceph_connection *con_get(struct ceph_connection *con)
1178{
1179        return con;
1180}
1181
1182static void con_put(struct ceph_connection *con)
1183{
1184}
1185
1186static const struct ceph_connection_operations mon_con_ops = {
1187        .get = con_get,
1188        .put = con_put,
1189        .dispatch = dispatch,
1190        .fault = mon_fault,
1191        .alloc_msg = mon_alloc_msg,
1192};
1193