linux/net/ceph/mon_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/types.h>
   5#include <linux/slab.h>
   6#include <linux/random.h>
   7#include <linux/sched.h>
   8
   9#include <linux/ceph/ceph_features.h>
  10#include <linux/ceph/mon_client.h>
  11#include <linux/ceph/libceph.h>
  12#include <linux/ceph/debugfs.h>
  13#include <linux/ceph/decode.h>
  14#include <linux/ceph/auth.h>
  15
  16/*
  17 * Interact with Ceph monitor cluster.  Handle requests for new map
  18 * versions, and periodically resend as needed.  Also implement
  19 * statfs() and umount().
  20 *
  21 * A small cluster of Ceph "monitors" are responsible for managing critical
  22 * cluster configuration and state information.  An odd number (e.g., 3, 5)
  23 * of cmon daemons use a modified version of the Paxos part-time parliament
  24 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  25 * list of clients who have mounted the file system.
  26 *
  27 * We maintain an open, active session with a monitor at all times in order to
  28 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  29 * TCP socket to ensure we detect a failure.  If the connection does break, we
  30 * randomly hunt for a new monitor.  Once the connection is reestablished, we
  31 * resend any outstanding requests.
  32 */
  33
  34static const struct ceph_connection_operations mon_con_ops;
  35
  36static int __validate_auth(struct ceph_mon_client *monc);
  37
  38/*
  39 * Decode a monmap blob (e.g., during mount).
  40 */
  41struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  42{
  43        struct ceph_monmap *m = NULL;
  44        int i, err = -EINVAL;
  45        struct ceph_fsid fsid;
  46        u32 epoch, num_mon;
  47        u32 len;
  48
  49        ceph_decode_32_safe(&p, end, len, bad);
  50        ceph_decode_need(&p, end, len, bad);
  51
  52        dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  53        p += sizeof(u16);  /* skip version */
  54
  55        ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  56        ceph_decode_copy(&p, &fsid, sizeof(fsid));
  57        epoch = ceph_decode_32(&p);
  58
  59        num_mon = ceph_decode_32(&p);
  60        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  61
  62        if (num_mon >= CEPH_MAX_MON)
  63                goto bad;
  64        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  65        if (m == NULL)
  66                return ERR_PTR(-ENOMEM);
  67        m->fsid = fsid;
  68        m->epoch = epoch;
  69        m->num_mon = num_mon;
  70        ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  71        for (i = 0; i < num_mon; i++)
  72                ceph_decode_addr(&m->mon_inst[i].addr);
  73
  74        dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  75             m->num_mon);
  76        for (i = 0; i < m->num_mon; i++)
  77                dout("monmap_decode  mon%d is %s\n", i,
  78                     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  79        return m;
  80
  81bad:
  82        dout("monmap_decode failed with %d\n", err);
  83        kfree(m);
  84        return ERR_PTR(err);
  85}
  86
  87/*
  88 * return true if *addr is included in the monmap.
  89 */
  90int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  91{
  92        int i;
  93
  94        for (i = 0; i < m->num_mon; i++)
  95                if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  96                        return 1;
  97        return 0;
  98}
  99
 100/*
 101 * Send an auth request.
 102 */
 103static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 104{
 105        monc->pending_auth = 1;
 106        monc->m_auth->front.iov_len = len;
 107        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 108        ceph_msg_revoke(monc->m_auth);
 109        ceph_msg_get(monc->m_auth);  /* keep our ref */
 110        ceph_con_send(&monc->con, monc->m_auth);
 111}
 112
 113/*
 114 * Close monitor session, if any.
 115 */
 116static void __close_session(struct ceph_mon_client *monc)
 117{
 118        dout("__close_session closing mon%d\n", monc->cur_mon);
 119        ceph_msg_revoke(monc->m_auth);
 120        ceph_msg_revoke_incoming(monc->m_auth_reply);
 121        ceph_msg_revoke(monc->m_subscribe);
 122        ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 123        ceph_con_close(&monc->con);
 124
 125        monc->pending_auth = 0;
 126        ceph_auth_reset(monc->auth);
 127}
 128
 129/*
 130 * Pick a new monitor at random and set cur_mon.  If we are repicking
 131 * (i.e. cur_mon is already set), be sure to pick a different one.
 132 */
 133static void pick_new_mon(struct ceph_mon_client *monc)
 134{
 135        int old_mon = monc->cur_mon;
 136
 137        BUG_ON(monc->monmap->num_mon < 1);
 138
 139        if (monc->monmap->num_mon == 1) {
 140                monc->cur_mon = 0;
 141        } else {
 142                int max = monc->monmap->num_mon;
 143                int o = -1;
 144                int n;
 145
 146                if (monc->cur_mon >= 0) {
 147                        if (monc->cur_mon < monc->monmap->num_mon)
 148                                o = monc->cur_mon;
 149                        if (o >= 0)
 150                                max--;
 151                }
 152
 153                n = prandom_u32() % max;
 154                if (o >= 0 && n >= o)
 155                        n++;
 156
 157                monc->cur_mon = n;
 158        }
 159
 160        dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
 161             monc->cur_mon, monc->monmap->num_mon);
 162}
 163
 164/*
 165 * Open a session with a new monitor.
 166 */
 167static void __open_session(struct ceph_mon_client *monc)
 168{
 169        int ret;
 170
 171        pick_new_mon(monc);
 172
 173        monc->hunting = true;
 174        if (monc->had_a_connection) {
 175                monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
 176                if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
 177                        monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
 178        }
 179
 180        monc->sub_renew_after = jiffies; /* i.e., expired */
 181        monc->sub_renew_sent = 0;
 182
 183        dout("%s opening mon%d\n", __func__, monc->cur_mon);
 184        ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 185                      &monc->monmap->mon_inst[monc->cur_mon].addr);
 186
 187        /*
 188         * send an initial keepalive to ensure our timestamp is valid
 189         * by the time we are in an OPENED state
 190         */
 191        ceph_con_keepalive(&monc->con);
 192
 193        /* initiate authentication handshake */
 194        ret = ceph_auth_build_hello(monc->auth,
 195                                    monc->m_auth->front.iov_base,
 196                                    monc->m_auth->front_alloc_len);
 197        BUG_ON(ret <= 0);
 198        __send_prepared_auth_request(monc, ret);
 199}
 200
 201static void reopen_session(struct ceph_mon_client *monc)
 202{
 203        if (!monc->hunting)
 204                pr_info("mon%d %s session lost, hunting for new mon\n",
 205                    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
 206
 207        __close_session(monc);
 208        __open_session(monc);
 209}
 210
 211/*
 212 * Reschedule delayed work timer.
 213 */
 214static void __schedule_delayed(struct ceph_mon_client *monc)
 215{
 216        unsigned long delay;
 217
 218        if (monc->hunting)
 219                delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
 220        else
 221                delay = CEPH_MONC_PING_INTERVAL;
 222
 223        dout("__schedule_delayed after %lu\n", delay);
 224        mod_delayed_work(system_wq, &monc->delayed_work,
 225                         round_jiffies_relative(delay));
 226}
 227
 228const char *ceph_sub_str[] = {
 229        [CEPH_SUB_MONMAP] = "monmap",
 230        [CEPH_SUB_OSDMAP] = "osdmap",
 231        [CEPH_SUB_FSMAP]  = "fsmap.user",
 232        [CEPH_SUB_MDSMAP] = "mdsmap",
 233};
 234
 235/*
 236 * Send subscribe request for one or more maps, according to
 237 * monc->subs.
 238 */
 239static void __send_subscribe(struct ceph_mon_client *monc)
 240{
 241        struct ceph_msg *msg = monc->m_subscribe;
 242        void *p = msg->front.iov_base;
 243        void *const end = p + msg->front_alloc_len;
 244        int num = 0;
 245        int i;
 246
 247        dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 248
 249        BUG_ON(monc->cur_mon < 0);
 250
 251        if (!monc->sub_renew_sent)
 252                monc->sub_renew_sent = jiffies | 1; /* never 0 */
 253
 254        msg->hdr.version = cpu_to_le16(2);
 255
 256        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 257                if (monc->subs[i].want)
 258                        num++;
 259        }
 260        BUG_ON(num < 1); /* monmap sub is always there */
 261        ceph_encode_32(&p, num);
 262        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 263                char buf[32];
 264                int len;
 265
 266                if (!monc->subs[i].want)
 267                        continue;
 268
 269                len = sprintf(buf, "%s", ceph_sub_str[i]);
 270                if (i == CEPH_SUB_MDSMAP &&
 271                    monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
 272                        len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
 273
 274                dout("%s %s start %llu flags 0x%x\n", __func__, buf,
 275                     le64_to_cpu(monc->subs[i].item.start),
 276                     monc->subs[i].item.flags);
 277                ceph_encode_string(&p, end, buf, len);
 278                memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
 279                p += sizeof(monc->subs[i].item);
 280        }
 281
 282        BUG_ON(p > end);
 283        msg->front.iov_len = p - msg->front.iov_base;
 284        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 285        ceph_msg_revoke(msg);
 286        ceph_con_send(&monc->con, ceph_msg_get(msg));
 287}
 288
 289static void handle_subscribe_ack(struct ceph_mon_client *monc,
 290                                 struct ceph_msg *msg)
 291{
 292        unsigned int seconds;
 293        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 294
 295        if (msg->front.iov_len < sizeof(*h))
 296                goto bad;
 297        seconds = le32_to_cpu(h->duration);
 298
 299        mutex_lock(&monc->mutex);
 300        if (monc->sub_renew_sent) {
 301                /*
 302                 * This is only needed for legacy (infernalis or older)
 303                 * MONs -- see delayed_work().
 304                 */
 305                monc->sub_renew_after = monc->sub_renew_sent +
 306                                            (seconds >> 1) * HZ - 1;
 307                dout("%s sent %lu duration %d renew after %lu\n", __func__,
 308                     monc->sub_renew_sent, seconds, monc->sub_renew_after);
 309                monc->sub_renew_sent = 0;
 310        } else {
 311                dout("%s sent %lu renew after %lu, ignoring\n", __func__,
 312                     monc->sub_renew_sent, monc->sub_renew_after);
 313        }
 314        mutex_unlock(&monc->mutex);
 315        return;
 316bad:
 317        pr_err("got corrupt subscribe-ack msg\n");
 318        ceph_msg_dump(msg);
 319}
 320
 321/*
 322 * Register interest in a map
 323 *
 324 * @sub: one of CEPH_SUB_*
 325 * @epoch: X for "every map since X", or 0 for "just the latest"
 326 */
 327static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
 328                                 u32 epoch, bool continuous)
 329{
 330        __le64 start = cpu_to_le64(epoch);
 331        u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 332
 333        dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
 334             epoch, continuous);
 335
 336        if (monc->subs[sub].want &&
 337            monc->subs[sub].item.start == start &&
 338            monc->subs[sub].item.flags == flags)
 339                return false;
 340
 341        monc->subs[sub].item.start = start;
 342        monc->subs[sub].item.flags = flags;
 343        monc->subs[sub].want = true;
 344
 345        return true;
 346}
 347
 348bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
 349                        bool continuous)
 350{
 351        bool need_request;
 352
 353        mutex_lock(&monc->mutex);
 354        need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
 355        mutex_unlock(&monc->mutex);
 356
 357        return need_request;
 358}
 359EXPORT_SYMBOL(ceph_monc_want_map);
 360
 361/*
 362 * Keep track of which maps we have
 363 *
 364 * @sub: one of CEPH_SUB_*
 365 */
 366static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
 367                                u32 epoch)
 368{
 369        dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
 370
 371        if (monc->subs[sub].want) {
 372                if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
 373                        monc->subs[sub].want = false;
 374                else
 375                        monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
 376        }
 377
 378        monc->subs[sub].have = epoch;
 379}
 380
 381void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 382{
 383        mutex_lock(&monc->mutex);
 384        __ceph_monc_got_map(monc, sub, epoch);
 385        mutex_unlock(&monc->mutex);
 386}
 387EXPORT_SYMBOL(ceph_monc_got_map);
 388
 389void ceph_monc_renew_subs(struct ceph_mon_client *monc)
 390{
 391        mutex_lock(&monc->mutex);
 392        __send_subscribe(monc);
 393        mutex_unlock(&monc->mutex);
 394}
 395EXPORT_SYMBOL(ceph_monc_renew_subs);
 396
 397/*
 398 * Wait for an osdmap with a given epoch.
 399 *
 400 * @epoch: epoch to wait for
 401 * @timeout: in jiffies, 0 means "wait forever"
 402 */
 403int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 404                          unsigned long timeout)
 405{
 406        unsigned long started = jiffies;
 407        long ret;
 408
 409        mutex_lock(&monc->mutex);
 410        while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 411                mutex_unlock(&monc->mutex);
 412
 413                if (timeout && time_after_eq(jiffies, started + timeout))
 414                        return -ETIMEDOUT;
 415
 416                ret = wait_event_interruptible_timeout(monc->client->auth_wq,
 417                                     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
 418                                     ceph_timeout_jiffies(timeout));
 419                if (ret < 0)
 420                        return ret;
 421
 422                mutex_lock(&monc->mutex);
 423        }
 424
 425        mutex_unlock(&monc->mutex);
 426        return 0;
 427}
 428EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 429
 430/*
 431 * Open a session with a random monitor.  Request monmap and osdmap,
 432 * which are waited upon in __ceph_open_session().
 433 */
 434int ceph_monc_open_session(struct ceph_mon_client *monc)
 435{
 436        mutex_lock(&monc->mutex);
 437        __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
 438        __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 439        __open_session(monc);
 440        __schedule_delayed(monc);
 441        mutex_unlock(&monc->mutex);
 442        return 0;
 443}
 444EXPORT_SYMBOL(ceph_monc_open_session);
 445
 446static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 447                                 struct ceph_msg *msg)
 448{
 449        struct ceph_client *client = monc->client;
 450        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 451        void *p, *end;
 452
 453        mutex_lock(&monc->mutex);
 454
 455        dout("handle_monmap\n");
 456        p = msg->front.iov_base;
 457        end = p + msg->front.iov_len;
 458
 459        monmap = ceph_monmap_decode(p, end);
 460        if (IS_ERR(monmap)) {
 461                pr_err("problem decoding monmap, %d\n",
 462                       (int)PTR_ERR(monmap));
 463                goto out;
 464        }
 465
 466        if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 467                kfree(monmap);
 468                goto out;
 469        }
 470
 471        client->monc.monmap = monmap;
 472        kfree(old);
 473
 474        __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 475        client->have_fsid = true;
 476
 477out:
 478        mutex_unlock(&monc->mutex);
 479        wake_up_all(&client->auth_wq);
 480}
 481
 482/*
 483 * generic requests (currently statfs, mon_get_version)
 484 */
 485DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
 486
 487static void release_generic_request(struct kref *kref)
 488{
 489        struct ceph_mon_generic_request *req =
 490                container_of(kref, struct ceph_mon_generic_request, kref);
 491
 492        dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
 493             req->reply);
 494        WARN_ON(!RB_EMPTY_NODE(&req->node));
 495
 496        if (req->reply)
 497                ceph_msg_put(req->reply);
 498        if (req->request)
 499                ceph_msg_put(req->request);
 500
 501        kfree(req);
 502}
 503
 504static void put_generic_request(struct ceph_mon_generic_request *req)
 505{
 506        if (req)
 507                kref_put(&req->kref, release_generic_request);
 508}
 509
 510static void get_generic_request(struct ceph_mon_generic_request *req)
 511{
 512        kref_get(&req->kref);
 513}
 514
 515static struct ceph_mon_generic_request *
 516alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
 517{
 518        struct ceph_mon_generic_request *req;
 519
 520        req = kzalloc(sizeof(*req), gfp);
 521        if (!req)
 522                return NULL;
 523
 524        req->monc = monc;
 525        kref_init(&req->kref);
 526        RB_CLEAR_NODE(&req->node);
 527        init_completion(&req->completion);
 528
 529        dout("%s greq %p\n", __func__, req);
 530        return req;
 531}
 532
 533static void register_generic_request(struct ceph_mon_generic_request *req)
 534{
 535        struct ceph_mon_client *monc = req->monc;
 536
 537        WARN_ON(req->tid);
 538
 539        get_generic_request(req);
 540        req->tid = ++monc->last_tid;
 541        insert_generic_request(&monc->generic_request_tree, req);
 542}
 543
 544static void send_generic_request(struct ceph_mon_client *monc,
 545                                 struct ceph_mon_generic_request *req)
 546{
 547        WARN_ON(!req->tid);
 548
 549        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 550        req->request->hdr.tid = cpu_to_le64(req->tid);
 551        ceph_con_send(&monc->con, ceph_msg_get(req->request));
 552}
 553
 554static void __finish_generic_request(struct ceph_mon_generic_request *req)
 555{
 556        struct ceph_mon_client *monc = req->monc;
 557
 558        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 559        erase_generic_request(&monc->generic_request_tree, req);
 560
 561        ceph_msg_revoke(req->request);
 562        ceph_msg_revoke_incoming(req->reply);
 563}
 564
 565static void finish_generic_request(struct ceph_mon_generic_request *req)
 566{
 567        __finish_generic_request(req);
 568        put_generic_request(req);
 569}
 570
 571static void complete_generic_request(struct ceph_mon_generic_request *req)
 572{
 573        if (req->complete_cb)
 574                req->complete_cb(req);
 575        else
 576                complete_all(&req->completion);
 577        put_generic_request(req);
 578}
 579
 580static void cancel_generic_request(struct ceph_mon_generic_request *req)
 581{
 582        struct ceph_mon_client *monc = req->monc;
 583        struct ceph_mon_generic_request *lookup_req;
 584
 585        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 586
 587        mutex_lock(&monc->mutex);
 588        lookup_req = lookup_generic_request(&monc->generic_request_tree,
 589                                            req->tid);
 590        if (lookup_req) {
 591                WARN_ON(lookup_req != req);
 592                finish_generic_request(req);
 593        }
 594
 595        mutex_unlock(&monc->mutex);
 596}
 597
 598static int wait_generic_request(struct ceph_mon_generic_request *req)
 599{
 600        int ret;
 601
 602        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 603        ret = wait_for_completion_interruptible(&req->completion);
 604        if (ret)
 605                cancel_generic_request(req);
 606        else
 607                ret = req->result; /* completed */
 608
 609        return ret;
 610}
 611
 612static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 613                                         struct ceph_msg_header *hdr,
 614                                         int *skip)
 615{
 616        struct ceph_mon_client *monc = con->private;
 617        struct ceph_mon_generic_request *req;
 618        u64 tid = le64_to_cpu(hdr->tid);
 619        struct ceph_msg *m;
 620
 621        mutex_lock(&monc->mutex);
 622        req = lookup_generic_request(&monc->generic_request_tree, tid);
 623        if (!req) {
 624                dout("get_generic_reply %lld dne\n", tid);
 625                *skip = 1;
 626                m = NULL;
 627        } else {
 628                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 629                *skip = 0;
 630                m = ceph_msg_get(req->reply);
 631                /*
 632                 * we don't need to track the connection reading into
 633                 * this reply because we only have one open connection
 634                 * at a time, ever.
 635                 */
 636        }
 637        mutex_unlock(&monc->mutex);
 638        return m;
 639}
 640
 641/*
 642 * statfs
 643 */
 644static void handle_statfs_reply(struct ceph_mon_client *monc,
 645                                struct ceph_msg *msg)
 646{
 647        struct ceph_mon_generic_request *req;
 648        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 649        u64 tid = le64_to_cpu(msg->hdr.tid);
 650
 651        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 652
 653        if (msg->front.iov_len != sizeof(*reply))
 654                goto bad;
 655
 656        mutex_lock(&monc->mutex);
 657        req = lookup_generic_request(&monc->generic_request_tree, tid);
 658        if (!req) {
 659                mutex_unlock(&monc->mutex);
 660                return;
 661        }
 662
 663        req->result = 0;
 664        *req->u.st = reply->st; /* struct */
 665        __finish_generic_request(req);
 666        mutex_unlock(&monc->mutex);
 667
 668        complete_generic_request(req);
 669        return;
 670
 671bad:
 672        pr_err("corrupt statfs reply, tid %llu\n", tid);
 673        ceph_msg_dump(msg);
 674}
 675
 676/*
 677 * Do a synchronous statfs().
 678 */
 679int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 680{
 681        struct ceph_mon_generic_request *req;
 682        struct ceph_mon_statfs *h;
 683        int ret = -ENOMEM;
 684
 685        req = alloc_generic_request(monc, GFP_NOFS);
 686        if (!req)
 687                goto out;
 688
 689        req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
 690                                    true);
 691        if (!req->request)
 692                goto out;
 693
 694        req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
 695        if (!req->reply)
 696                goto out;
 697
 698        req->u.st = buf;
 699
 700        mutex_lock(&monc->mutex);
 701        register_generic_request(req);
 702        /* fill out request */
 703        h = req->request->front.iov_base;
 704        h->monhdr.have_version = 0;
 705        h->monhdr.session_mon = cpu_to_le16(-1);
 706        h->monhdr.session_mon_tid = 0;
 707        h->fsid = monc->monmap->fsid;
 708        send_generic_request(monc, req);
 709        mutex_unlock(&monc->mutex);
 710
 711        ret = wait_generic_request(req);
 712out:
 713        put_generic_request(req);
 714        return ret;
 715}
 716EXPORT_SYMBOL(ceph_monc_do_statfs);
 717
 718static void handle_get_version_reply(struct ceph_mon_client *monc,
 719                                     struct ceph_msg *msg)
 720{
 721        struct ceph_mon_generic_request *req;
 722        u64 tid = le64_to_cpu(msg->hdr.tid);
 723        void *p = msg->front.iov_base;
 724        void *end = p + msg->front_alloc_len;
 725        u64 handle;
 726
 727        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 728
 729        ceph_decode_need(&p, end, 2*sizeof(u64), bad);
 730        handle = ceph_decode_64(&p);
 731        if (tid != 0 && tid != handle)
 732                goto bad;
 733
 734        mutex_lock(&monc->mutex);
 735        req = lookup_generic_request(&monc->generic_request_tree, handle);
 736        if (!req) {
 737                mutex_unlock(&monc->mutex);
 738                return;
 739        }
 740
 741        req->result = 0;
 742        req->u.newest = ceph_decode_64(&p);
 743        __finish_generic_request(req);
 744        mutex_unlock(&monc->mutex);
 745
 746        complete_generic_request(req);
 747        return;
 748
 749bad:
 750        pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
 751        ceph_msg_dump(msg);
 752}
 753
 754static struct ceph_mon_generic_request *
 755__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 756                        ceph_monc_callback_t cb, u64 private_data)
 757{
 758        struct ceph_mon_generic_request *req;
 759
 760        req = alloc_generic_request(monc, GFP_NOIO);
 761        if (!req)
 762                goto err_put_req;
 763
 764        req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
 765                                    sizeof(u64) + sizeof(u32) + strlen(what),
 766                                    GFP_NOIO, true);
 767        if (!req->request)
 768                goto err_put_req;
 769
 770        req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
 771                                  true);
 772        if (!req->reply)
 773                goto err_put_req;
 774
 775        req->complete_cb = cb;
 776        req->private_data = private_data;
 777
 778        mutex_lock(&monc->mutex);
 779        register_generic_request(req);
 780        {
 781                void *p = req->request->front.iov_base;
 782                void *const end = p + req->request->front_alloc_len;
 783
 784                ceph_encode_64(&p, req->tid); /* handle */
 785                ceph_encode_string(&p, end, what, strlen(what));
 786                WARN_ON(p != end);
 787        }
 788        send_generic_request(monc, req);
 789        mutex_unlock(&monc->mutex);
 790
 791        return req;
 792
 793err_put_req:
 794        put_generic_request(req);
 795        return ERR_PTR(-ENOMEM);
 796}
 797
 798/*
 799 * Send MMonGetVersion and wait for the reply.
 800 *
 801 * @what: one of "mdsmap", "osdmap" or "monmap"
 802 */
 803int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 804                          u64 *newest)
 805{
 806        struct ceph_mon_generic_request *req;
 807        int ret;
 808
 809        req = __ceph_monc_get_version(monc, what, NULL, 0);
 810        if (IS_ERR(req))
 811                return PTR_ERR(req);
 812
 813        ret = wait_generic_request(req);
 814        if (!ret)
 815                *newest = req->u.newest;
 816
 817        put_generic_request(req);
 818        return ret;
 819}
 820EXPORT_SYMBOL(ceph_monc_get_version);
 821
 822/*
 823 * Send MMonGetVersion,
 824 *
 825 * @what: one of "mdsmap", "osdmap" or "monmap"
 826 */
 827int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 828                                ceph_monc_callback_t cb, u64 private_data)
 829{
 830        struct ceph_mon_generic_request *req;
 831
 832        req = __ceph_monc_get_version(monc, what, cb, private_data);
 833        if (IS_ERR(req))
 834                return PTR_ERR(req);
 835
 836        put_generic_request(req);
 837        return 0;
 838}
 839EXPORT_SYMBOL(ceph_monc_get_version_async);
 840
 841static void handle_command_ack(struct ceph_mon_client *monc,
 842                               struct ceph_msg *msg)
 843{
 844        struct ceph_mon_generic_request *req;
 845        void *p = msg->front.iov_base;
 846        void *const end = p + msg->front_alloc_len;
 847        u64 tid = le64_to_cpu(msg->hdr.tid);
 848
 849        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 850
 851        ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
 852                                                            sizeof(u32), bad);
 853        p += sizeof(struct ceph_mon_request_header);
 854
 855        mutex_lock(&monc->mutex);
 856        req = lookup_generic_request(&monc->generic_request_tree, tid);
 857        if (!req) {
 858                mutex_unlock(&monc->mutex);
 859                return;
 860        }
 861
 862        req->result = ceph_decode_32(&p);
 863        __finish_generic_request(req);
 864        mutex_unlock(&monc->mutex);
 865
 866        complete_generic_request(req);
 867        return;
 868
 869bad:
 870        pr_err("corrupt mon_command ack, tid %llu\n", tid);
 871        ceph_msg_dump(msg);
 872}
 873
 874int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
 875                            struct ceph_entity_addr *client_addr)
 876{
 877        struct ceph_mon_generic_request *req;
 878        struct ceph_mon_command *h;
 879        int ret = -ENOMEM;
 880        int len;
 881
 882        req = alloc_generic_request(monc, GFP_NOIO);
 883        if (!req)
 884                goto out;
 885
 886        req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
 887        if (!req->request)
 888                goto out;
 889
 890        req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
 891                                  true);
 892        if (!req->reply)
 893                goto out;
 894
 895        mutex_lock(&monc->mutex);
 896        register_generic_request(req);
 897        h = req->request->front.iov_base;
 898        h->monhdr.have_version = 0;
 899        h->monhdr.session_mon = cpu_to_le16(-1);
 900        h->monhdr.session_mon_tid = 0;
 901        h->fsid = monc->monmap->fsid;
 902        h->num_strs = cpu_to_le32(1);
 903        len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
 904                                 \"blacklistop\": \"add\", \
 905                                 \"addr\": \"%pISpc/%u\" }",
 906                      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
 907        h->str_len = cpu_to_le32(len);
 908        send_generic_request(monc, req);
 909        mutex_unlock(&monc->mutex);
 910
 911        ret = wait_generic_request(req);
 912out:
 913        put_generic_request(req);
 914        return ret;
 915}
 916EXPORT_SYMBOL(ceph_monc_blacklist_add);
 917
 918/*
 919 * Resend pending generic requests.
 920 */
 921static void __resend_generic_request(struct ceph_mon_client *monc)
 922{
 923        struct ceph_mon_generic_request *req;
 924        struct rb_node *p;
 925
 926        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 927                req = rb_entry(p, struct ceph_mon_generic_request, node);
 928                ceph_msg_revoke(req->request);
 929                ceph_msg_revoke_incoming(req->reply);
 930                ceph_con_send(&monc->con, ceph_msg_get(req->request));
 931        }
 932}
 933
 934/*
 935 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 936 * renew/retry subscription as needed (in case it is timing out, or we
 937 * got an ENOMEM).  And keep the monitor connection alive.
 938 */
 939static void delayed_work(struct work_struct *work)
 940{
 941        struct ceph_mon_client *monc =
 942                container_of(work, struct ceph_mon_client, delayed_work.work);
 943
 944        dout("monc delayed_work\n");
 945        mutex_lock(&monc->mutex);
 946        if (monc->hunting) {
 947                dout("%s continuing hunt\n", __func__);
 948                reopen_session(monc);
 949        } else {
 950                int is_auth = ceph_auth_is_authenticated(monc->auth);
 951                if (ceph_con_keepalive_expired(&monc->con,
 952                                               CEPH_MONC_PING_TIMEOUT)) {
 953                        dout("monc keepalive timeout\n");
 954                        is_auth = 0;
 955                        reopen_session(monc);
 956                }
 957
 958                if (!monc->hunting) {
 959                        ceph_con_keepalive(&monc->con);
 960                        __validate_auth(monc);
 961                }
 962
 963                if (is_auth &&
 964                    !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
 965                        unsigned long now = jiffies;
 966
 967                        dout("%s renew subs? now %lu renew after %lu\n",
 968                             __func__, now, monc->sub_renew_after);
 969                        if (time_after_eq(now, monc->sub_renew_after))
 970                                __send_subscribe(monc);
 971                }
 972        }
 973        __schedule_delayed(monc);
 974        mutex_unlock(&monc->mutex);
 975}
 976
 977/*
 978 * On startup, we build a temporary monmap populated with the IPs
 979 * provided by mount(2).
 980 */
 981static int build_initial_monmap(struct ceph_mon_client *monc)
 982{
 983        struct ceph_options *opt = monc->client->options;
 984        struct ceph_entity_addr *mon_addr = opt->mon_addr;
 985        int num_mon = opt->num_mon;
 986        int i;
 987
 988        /* build initial monmap */
 989        monc->monmap = kzalloc(sizeof(*monc->monmap) +
 990                               num_mon*sizeof(monc->monmap->mon_inst[0]),
 991                               GFP_KERNEL);
 992        if (!monc->monmap)
 993                return -ENOMEM;
 994        for (i = 0; i < num_mon; i++) {
 995                monc->monmap->mon_inst[i].addr = mon_addr[i];
 996                monc->monmap->mon_inst[i].addr.nonce = 0;
 997                monc->monmap->mon_inst[i].name.type =
 998                        CEPH_ENTITY_TYPE_MON;
 999                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1000        }
1001        monc->monmap->num_mon = num_mon;
1002        return 0;
1003}
1004
1005int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1006{
1007        int err = 0;
1008
1009        dout("init\n");
1010        memset(monc, 0, sizeof(*monc));
1011        monc->client = cl;
1012        monc->monmap = NULL;
1013        mutex_init(&monc->mutex);
1014
1015        err = build_initial_monmap(monc);
1016        if (err)
1017                goto out;
1018
1019        /* connection */
1020        /* authentication */
1021        monc->auth = ceph_auth_init(cl->options->name,
1022                                    cl->options->key);
1023        if (IS_ERR(monc->auth)) {
1024                err = PTR_ERR(monc->auth);
1025                goto out_monmap;
1026        }
1027        monc->auth->want_keys =
1028                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1029                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1030
1031        /* msgs */
1032        err = -ENOMEM;
1033        monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1034                                     sizeof(struct ceph_mon_subscribe_ack),
1035                                     GFP_KERNEL, true);
1036        if (!monc->m_subscribe_ack)
1037                goto out_auth;
1038
1039        monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1040                                         GFP_KERNEL, true);
1041        if (!monc->m_subscribe)
1042                goto out_subscribe_ack;
1043
1044        monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1045                                          GFP_KERNEL, true);
1046        if (!monc->m_auth_reply)
1047                goto out_subscribe;
1048
1049        monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1050        monc->pending_auth = 0;
1051        if (!monc->m_auth)
1052                goto out_auth_reply;
1053
1054        ceph_con_init(&monc->con, monc, &mon_con_ops,
1055                      &monc->client->msgr);
1056
1057        monc->cur_mon = -1;
1058        monc->had_a_connection = false;
1059        monc->hunt_mult = 1;
1060
1061        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1062        monc->generic_request_tree = RB_ROOT;
1063        monc->last_tid = 0;
1064
1065        monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1066
1067        return 0;
1068
1069out_auth_reply:
1070        ceph_msg_put(monc->m_auth_reply);
1071out_subscribe:
1072        ceph_msg_put(monc->m_subscribe);
1073out_subscribe_ack:
1074        ceph_msg_put(monc->m_subscribe_ack);
1075out_auth:
1076        ceph_auth_destroy(monc->auth);
1077out_monmap:
1078        kfree(monc->monmap);
1079out:
1080        return err;
1081}
1082EXPORT_SYMBOL(ceph_monc_init);
1083
1084void ceph_monc_stop(struct ceph_mon_client *monc)
1085{
1086        dout("stop\n");
1087        cancel_delayed_work_sync(&monc->delayed_work);
1088
1089        mutex_lock(&monc->mutex);
1090        __close_session(monc);
1091        monc->cur_mon = -1;
1092        mutex_unlock(&monc->mutex);
1093
1094        /*
1095         * flush msgr queue before we destroy ourselves to ensure that:
1096         *  - any work that references our embedded con is finished.
1097         *  - any osd_client or other work that may reference an authorizer
1098         *    finishes before we shut down the auth subsystem.
1099         */
1100        ceph_msgr_flush();
1101
1102        ceph_auth_destroy(monc->auth);
1103
1104        WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1105
1106        ceph_msg_put(monc->m_auth);
1107        ceph_msg_put(monc->m_auth_reply);
1108        ceph_msg_put(monc->m_subscribe);
1109        ceph_msg_put(monc->m_subscribe_ack);
1110
1111        kfree(monc->monmap);
1112}
1113EXPORT_SYMBOL(ceph_monc_stop);
1114
1115static void finish_hunting(struct ceph_mon_client *monc)
1116{
1117        if (monc->hunting) {
1118                dout("%s found mon%d\n", __func__, monc->cur_mon);
1119                monc->hunting = false;
1120                monc->had_a_connection = true;
1121                monc->hunt_mult /= 2; /* reduce by 50% */
1122                if (monc->hunt_mult < 1)
1123                        monc->hunt_mult = 1;
1124        }
1125}
1126
1127static void handle_auth_reply(struct ceph_mon_client *monc,
1128                              struct ceph_msg *msg)
1129{
1130        int ret;
1131        int was_auth = 0;
1132
1133        mutex_lock(&monc->mutex);
1134        was_auth = ceph_auth_is_authenticated(monc->auth);
1135        monc->pending_auth = 0;
1136        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1137                                     msg->front.iov_len,
1138                                     monc->m_auth->front.iov_base,
1139                                     monc->m_auth->front_alloc_len);
1140        if (ret > 0) {
1141                __send_prepared_auth_request(monc, ret);
1142                goto out;
1143        }
1144
1145        finish_hunting(monc);
1146
1147        if (ret < 0) {
1148                monc->client->auth_err = ret;
1149        } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1150                dout("authenticated, starting session\n");
1151
1152                monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1153                monc->client->msgr.inst.name.num =
1154                                        cpu_to_le64(monc->auth->global_id);
1155
1156                __send_subscribe(monc);
1157                __resend_generic_request(monc);
1158
1159                pr_info("mon%d %s session established\n", monc->cur_mon,
1160                        ceph_pr_addr(&monc->con.peer_addr.in_addr));
1161        }
1162
1163out:
1164        mutex_unlock(&monc->mutex);
1165        if (monc->client->auth_err < 0)
1166                wake_up_all(&monc->client->auth_wq);
1167}
1168
1169static int __validate_auth(struct ceph_mon_client *monc)
1170{
1171        int ret;
1172
1173        if (monc->pending_auth)
1174                return 0;
1175
1176        ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1177                              monc->m_auth->front_alloc_len);
1178        if (ret <= 0)
1179                return ret; /* either an error, or no need to authenticate */
1180        __send_prepared_auth_request(monc, ret);
1181        return 0;
1182}
1183
1184int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1185{
1186        int ret;
1187
1188        mutex_lock(&monc->mutex);
1189        ret = __validate_auth(monc);
1190        mutex_unlock(&monc->mutex);
1191        return ret;
1192}
1193EXPORT_SYMBOL(ceph_monc_validate_auth);
1194
1195/*
1196 * handle incoming message
1197 */
1198static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1199{
1200        struct ceph_mon_client *monc = con->private;
1201        int type = le16_to_cpu(msg->hdr.type);
1202
1203        if (!monc)
1204                return;
1205
1206        switch (type) {
1207        case CEPH_MSG_AUTH_REPLY:
1208                handle_auth_reply(monc, msg);
1209                break;
1210
1211        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1212                handle_subscribe_ack(monc, msg);
1213                break;
1214
1215        case CEPH_MSG_STATFS_REPLY:
1216                handle_statfs_reply(monc, msg);
1217                break;
1218
1219        case CEPH_MSG_MON_GET_VERSION_REPLY:
1220                handle_get_version_reply(monc, msg);
1221                break;
1222
1223        case CEPH_MSG_MON_COMMAND_ACK:
1224                handle_command_ack(monc, msg);
1225                break;
1226
1227        case CEPH_MSG_MON_MAP:
1228                ceph_monc_handle_map(monc, msg);
1229                break;
1230
1231        case CEPH_MSG_OSD_MAP:
1232                ceph_osdc_handle_map(&monc->client->osdc, msg);
1233                break;
1234
1235        default:
1236                /* can the chained handler handle it? */
1237                if (monc->client->extra_mon_dispatch &&
1238                    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1239                        break;
1240                        
1241                pr_err("received unknown message type %d %s\n", type,
1242                       ceph_msg_type_name(type));
1243        }
1244        ceph_msg_put(msg);
1245}
1246
1247/*
1248 * Allocate memory for incoming message
1249 */
1250static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1251                                      struct ceph_msg_header *hdr,
1252                                      int *skip)
1253{
1254        struct ceph_mon_client *monc = con->private;
1255        int type = le16_to_cpu(hdr->type);
1256        int front_len = le32_to_cpu(hdr->front_len);
1257        struct ceph_msg *m = NULL;
1258
1259        *skip = 0;
1260
1261        switch (type) {
1262        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1263                m = ceph_msg_get(monc->m_subscribe_ack);
1264                break;
1265        case CEPH_MSG_STATFS_REPLY:
1266        case CEPH_MSG_MON_COMMAND_ACK:
1267                return get_generic_reply(con, hdr, skip);
1268        case CEPH_MSG_AUTH_REPLY:
1269                m = ceph_msg_get(monc->m_auth_reply);
1270                break;
1271        case CEPH_MSG_MON_GET_VERSION_REPLY:
1272                if (le64_to_cpu(hdr->tid) != 0)
1273                        return get_generic_reply(con, hdr, skip);
1274
1275                /*
1276                 * Older OSDs don't set reply tid even if the orignal
1277                 * request had a non-zero tid.  Workaround this weirdness
1278                 * by falling through to the allocate case.
1279                 */
1280        case CEPH_MSG_MON_MAP:
1281        case CEPH_MSG_MDS_MAP:
1282        case CEPH_MSG_OSD_MAP:
1283        case CEPH_MSG_FS_MAP_USER:
1284                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1285                if (!m)
1286                        return NULL;    /* ENOMEM--return skip == 0 */
1287                break;
1288        }
1289
1290        if (!m) {
1291                pr_info("alloc_msg unknown type %d\n", type);
1292                *skip = 1;
1293        } else if (front_len > m->front_alloc_len) {
1294                pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1295                        front_len, m->front_alloc_len,
1296                        (unsigned int)con->peer_name.type,
1297                        le64_to_cpu(con->peer_name.num));
1298                ceph_msg_put(m);
1299                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1300        }
1301
1302        return m;
1303}
1304
1305/*
1306 * If the monitor connection resets, pick a new monitor and resubmit
1307 * any pending requests.
1308 */
1309static void mon_fault(struct ceph_connection *con)
1310{
1311        struct ceph_mon_client *monc = con->private;
1312
1313        mutex_lock(&monc->mutex);
1314        dout("%s mon%d\n", __func__, monc->cur_mon);
1315        if (monc->cur_mon >= 0) {
1316                if (!monc->hunting) {
1317                        dout("%s hunting for new mon\n", __func__);
1318                        reopen_session(monc);
1319                        __schedule_delayed(monc);
1320                } else {
1321                        dout("%s already hunting\n", __func__);
1322                }
1323        }
1324        mutex_unlock(&monc->mutex);
1325}
1326
1327/*
1328 * We can ignore refcounting on the connection struct, as all references
1329 * will come from the messenger workqueue, which is drained prior to
1330 * mon_client destruction.
1331 */
1332static struct ceph_connection *con_get(struct ceph_connection *con)
1333{
1334        return con;
1335}
1336
1337static void con_put(struct ceph_connection *con)
1338{
1339}
1340
1341static const struct ceph_connection_operations mon_con_ops = {
1342        .get = con_get,
1343        .put = con_put,
1344        .dispatch = dispatch,
1345        .fault = mon_fault,
1346        .alloc_msg = mon_alloc_msg,
1347};
1348