linux/net/ceph/mon_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/types.h>
   5#include <linux/slab.h>
   6#include <linux/random.h>
   7#include <linux/sched.h>
   8
   9#include <linux/ceph/mon_client.h>
  10#include <linux/ceph/libceph.h>
  11#include <linux/ceph/debugfs.h>
  12#include <linux/ceph/decode.h>
  13#include <linux/ceph/auth.h>
  14
  15/*
  16 * Interact with Ceph monitor cluster.  Handle requests for new map
  17 * versions, and periodically resend as needed.  Also implement
  18 * statfs() and umount().
  19 *
  20 * A small cluster of Ceph "monitors" are responsible for managing critical
  21 * cluster configuration and state information.  An odd number (e.g., 3, 5)
  22 * of cmon daemons use a modified version of the Paxos part-time parliament
  23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  24 * list of clients who have mounted the file system.
  25 *
  26 * We maintain an open, active session with a monitor at all times in order to
  27 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  28 * TCP socket to ensure we detect a failure.  If the connection does break, we
  29 * randomly hunt for a new monitor.  Once the connection is reestablished, we
  30 * resend any outstanding requests.
  31 */
  32
  33static const struct ceph_connection_operations mon_con_ops;
  34
  35static int __validate_auth(struct ceph_mon_client *monc);
  36
  37/*
  38 * Decode a monmap blob (e.g., during mount).
  39 */
  40struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  41{
  42        struct ceph_monmap *m = NULL;
  43        int i, err = -EINVAL;
  44        struct ceph_fsid fsid;
  45        u32 epoch, num_mon;
  46        u16 version;
  47        u32 len;
  48
  49        ceph_decode_32_safe(&p, end, len, bad);
  50        ceph_decode_need(&p, end, len, bad);
  51
  52        dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  53
  54        ceph_decode_16_safe(&p, end, version, bad);
  55
  56        ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  57        ceph_decode_copy(&p, &fsid, sizeof(fsid));
  58        epoch = ceph_decode_32(&p);
  59
  60        num_mon = ceph_decode_32(&p);
  61        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  62
  63        if (num_mon >= CEPH_MAX_MON)
  64                goto bad;
  65        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  66        if (m == NULL)
  67                return ERR_PTR(-ENOMEM);
  68        m->fsid = fsid;
  69        m->epoch = epoch;
  70        m->num_mon = num_mon;
  71        ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  72        for (i = 0; i < num_mon; i++)
  73                ceph_decode_addr(&m->mon_inst[i].addr);
  74
  75        dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  76             m->num_mon);
  77        for (i = 0; i < m->num_mon; i++)
  78                dout("monmap_decode  mon%d is %s\n", i,
  79                     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  80        return m;
  81
  82bad:
  83        dout("monmap_decode failed with %d\n", err);
  84        kfree(m);
  85        return ERR_PTR(err);
  86}
  87
  88/*
  89 * return true if *addr is included in the monmap.
  90 */
  91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  92{
  93        int i;
  94
  95        for (i = 0; i < m->num_mon; i++)
  96                if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  97                        return 1;
  98        return 0;
  99}
 100
 101/*
 102 * Send an auth request.
 103 */
 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 105{
 106        monc->pending_auth = 1;
 107        monc->m_auth->front.iov_len = len;
 108        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 109        ceph_msg_revoke(monc->m_auth);
 110        ceph_msg_get(monc->m_auth);  /* keep our ref */
 111        ceph_con_send(&monc->con, monc->m_auth);
 112}
 113
 114/*
 115 * Close monitor session, if any.
 116 */
 117static void __close_session(struct ceph_mon_client *monc)
 118{
 119        dout("__close_session closing mon%d\n", monc->cur_mon);
 120        ceph_msg_revoke(monc->m_auth);
 121        ceph_msg_revoke_incoming(monc->m_auth_reply);
 122        ceph_msg_revoke(monc->m_subscribe);
 123        ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 124        ceph_con_close(&monc->con);
 125
 126        monc->pending_auth = 0;
 127        ceph_auth_reset(monc->auth);
 128}
 129
 130/*
 131 * Pick a new monitor at random and set cur_mon.  If we are repicking
 132 * (i.e. cur_mon is already set), be sure to pick a different one.
 133 */
 134static void pick_new_mon(struct ceph_mon_client *monc)
 135{
 136        int old_mon = monc->cur_mon;
 137
 138        BUG_ON(monc->monmap->num_mon < 1);
 139
 140        if (monc->monmap->num_mon == 1) {
 141                monc->cur_mon = 0;
 142        } else {
 143                int max = monc->monmap->num_mon;
 144                int o = -1;
 145                int n;
 146
 147                if (monc->cur_mon >= 0) {
 148                        if (monc->cur_mon < monc->monmap->num_mon)
 149                                o = monc->cur_mon;
 150                        if (o >= 0)
 151                                max--;
 152                }
 153
 154                n = prandom_u32() % max;
 155                if (o >= 0 && n >= o)
 156                        n++;
 157
 158                monc->cur_mon = n;
 159        }
 160
 161        dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
 162             monc->cur_mon, monc->monmap->num_mon);
 163}
 164
 165/*
 166 * Open a session with a new monitor.
 167 */
 168static void __open_session(struct ceph_mon_client *monc)
 169{
 170        int ret;
 171
 172        pick_new_mon(monc);
 173
 174        monc->hunting = true;
 175        if (monc->had_a_connection) {
 176                monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
 177                if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
 178                        monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
 179        }
 180
 181        monc->sub_renew_after = jiffies; /* i.e., expired */
 182        monc->sub_renew_sent = 0;
 183
 184        dout("%s opening mon%d\n", __func__, monc->cur_mon);
 185        ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 186                      &monc->monmap->mon_inst[monc->cur_mon].addr);
 187
 188        /*
 189         * send an initial keepalive to ensure our timestamp is valid
 190         * by the time we are in an OPENED state
 191         */
 192        ceph_con_keepalive(&monc->con);
 193
 194        /* initiate authentication handshake */
 195        ret = ceph_auth_build_hello(monc->auth,
 196                                    monc->m_auth->front.iov_base,
 197                                    monc->m_auth->front_alloc_len);
 198        BUG_ON(ret <= 0);
 199        __send_prepared_auth_request(monc, ret);
 200}
 201
 202static void reopen_session(struct ceph_mon_client *monc)
 203{
 204        if (!monc->hunting)
 205                pr_info("mon%d %s session lost, hunting for new mon\n",
 206                    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
 207
 208        __close_session(monc);
 209        __open_session(monc);
 210}
 211
 212/*
 213 * Reschedule delayed work timer.
 214 */
 215static void __schedule_delayed(struct ceph_mon_client *monc)
 216{
 217        unsigned long delay;
 218
 219        if (monc->hunting)
 220                delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
 221        else
 222                delay = CEPH_MONC_PING_INTERVAL;
 223
 224        dout("__schedule_delayed after %lu\n", delay);
 225        mod_delayed_work(system_wq, &monc->delayed_work,
 226                         round_jiffies_relative(delay));
 227}
 228
 229const char *ceph_sub_str[] = {
 230        [CEPH_SUB_MONMAP] = "monmap",
 231        [CEPH_SUB_OSDMAP] = "osdmap",
 232        [CEPH_SUB_FSMAP]  = "fsmap.user",
 233        [CEPH_SUB_MDSMAP] = "mdsmap",
 234};
 235
 236/*
 237 * Send subscribe request for one or more maps, according to
 238 * monc->subs.
 239 */
 240static void __send_subscribe(struct ceph_mon_client *monc)
 241{
 242        struct ceph_msg *msg = monc->m_subscribe;
 243        void *p = msg->front.iov_base;
 244        void *const end = p + msg->front_alloc_len;
 245        int num = 0;
 246        int i;
 247
 248        dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 249
 250        BUG_ON(monc->cur_mon < 0);
 251
 252        if (!monc->sub_renew_sent)
 253                monc->sub_renew_sent = jiffies | 1; /* never 0 */
 254
 255        msg->hdr.version = cpu_to_le16(2);
 256
 257        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 258                if (monc->subs[i].want)
 259                        num++;
 260        }
 261        BUG_ON(num < 1); /* monmap sub is always there */
 262        ceph_encode_32(&p, num);
 263        for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 264                char buf[32];
 265                int len;
 266
 267                if (!monc->subs[i].want)
 268                        continue;
 269
 270                len = sprintf(buf, "%s", ceph_sub_str[i]);
 271                if (i == CEPH_SUB_MDSMAP &&
 272                    monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
 273                        len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
 274
 275                dout("%s %s start %llu flags 0x%x\n", __func__, buf,
 276                     le64_to_cpu(monc->subs[i].item.start),
 277                     monc->subs[i].item.flags);
 278                ceph_encode_string(&p, end, buf, len);
 279                memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
 280                p += sizeof(monc->subs[i].item);
 281        }
 282
 283        BUG_ON(p > end);
 284        msg->front.iov_len = p - msg->front.iov_base;
 285        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 286        ceph_msg_revoke(msg);
 287        ceph_con_send(&monc->con, ceph_msg_get(msg));
 288}
 289
 290static void handle_subscribe_ack(struct ceph_mon_client *monc,
 291                                 struct ceph_msg *msg)
 292{
 293        unsigned int seconds;
 294        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 295
 296        if (msg->front.iov_len < sizeof(*h))
 297                goto bad;
 298        seconds = le32_to_cpu(h->duration);
 299
 300        mutex_lock(&monc->mutex);
 301        if (monc->sub_renew_sent) {
 302                monc->sub_renew_after = monc->sub_renew_sent +
 303                                            (seconds >> 1) * HZ - 1;
 304                dout("%s sent %lu duration %d renew after %lu\n", __func__,
 305                     monc->sub_renew_sent, seconds, monc->sub_renew_after);
 306                monc->sub_renew_sent = 0;
 307        } else {
 308                dout("%s sent %lu renew after %lu, ignoring\n", __func__,
 309                     monc->sub_renew_sent, monc->sub_renew_after);
 310        }
 311        mutex_unlock(&monc->mutex);
 312        return;
 313bad:
 314        pr_err("got corrupt subscribe-ack msg\n");
 315        ceph_msg_dump(msg);
 316}
 317
 318/*
 319 * Register interest in a map
 320 *
 321 * @sub: one of CEPH_SUB_*
 322 * @epoch: X for "every map since X", or 0 for "just the latest"
 323 */
 324static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
 325                                 u32 epoch, bool continuous)
 326{
 327        __le64 start = cpu_to_le64(epoch);
 328        u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 329
 330        dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
 331             epoch, continuous);
 332
 333        if (monc->subs[sub].want &&
 334            monc->subs[sub].item.start == start &&
 335            monc->subs[sub].item.flags == flags)
 336                return false;
 337
 338        monc->subs[sub].item.start = start;
 339        monc->subs[sub].item.flags = flags;
 340        monc->subs[sub].want = true;
 341
 342        return true;
 343}
 344
 345bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
 346                        bool continuous)
 347{
 348        bool need_request;
 349
 350        mutex_lock(&monc->mutex);
 351        need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
 352        mutex_unlock(&monc->mutex);
 353
 354        return need_request;
 355}
 356EXPORT_SYMBOL(ceph_monc_want_map);
 357
 358/*
 359 * Keep track of which maps we have
 360 *
 361 * @sub: one of CEPH_SUB_*
 362 */
 363static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
 364                                u32 epoch)
 365{
 366        dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
 367
 368        if (monc->subs[sub].want) {
 369                if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
 370                        monc->subs[sub].want = false;
 371                else
 372                        monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
 373        }
 374
 375        monc->subs[sub].have = epoch;
 376}
 377
 378void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 379{
 380        mutex_lock(&monc->mutex);
 381        __ceph_monc_got_map(monc, sub, epoch);
 382        mutex_unlock(&monc->mutex);
 383}
 384EXPORT_SYMBOL(ceph_monc_got_map);
 385
 386void ceph_monc_renew_subs(struct ceph_mon_client *monc)
 387{
 388        mutex_lock(&monc->mutex);
 389        __send_subscribe(monc);
 390        mutex_unlock(&monc->mutex);
 391}
 392EXPORT_SYMBOL(ceph_monc_renew_subs);
 393
 394/*
 395 * Wait for an osdmap with a given epoch.
 396 *
 397 * @epoch: epoch to wait for
 398 * @timeout: in jiffies, 0 means "wait forever"
 399 */
 400int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 401                          unsigned long timeout)
 402{
 403        unsigned long started = jiffies;
 404        long ret;
 405
 406        mutex_lock(&monc->mutex);
 407        while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 408                mutex_unlock(&monc->mutex);
 409
 410                if (timeout && time_after_eq(jiffies, started + timeout))
 411                        return -ETIMEDOUT;
 412
 413                ret = wait_event_interruptible_timeout(monc->client->auth_wq,
 414                                     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
 415                                     ceph_timeout_jiffies(timeout));
 416                if (ret < 0)
 417                        return ret;
 418
 419                mutex_lock(&monc->mutex);
 420        }
 421
 422        mutex_unlock(&monc->mutex);
 423        return 0;
 424}
 425EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 426
 427/*
 428 * Open a session with a random monitor.  Request monmap and osdmap,
 429 * which are waited upon in __ceph_open_session().
 430 */
 431int ceph_monc_open_session(struct ceph_mon_client *monc)
 432{
 433        mutex_lock(&monc->mutex);
 434        __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
 435        __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 436        __open_session(monc);
 437        __schedule_delayed(monc);
 438        mutex_unlock(&monc->mutex);
 439        return 0;
 440}
 441EXPORT_SYMBOL(ceph_monc_open_session);
 442
 443static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 444                                 struct ceph_msg *msg)
 445{
 446        struct ceph_client *client = monc->client;
 447        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 448        void *p, *end;
 449
 450        mutex_lock(&monc->mutex);
 451
 452        dout("handle_monmap\n");
 453        p = msg->front.iov_base;
 454        end = p + msg->front.iov_len;
 455
 456        monmap = ceph_monmap_decode(p, end);
 457        if (IS_ERR(monmap)) {
 458                pr_err("problem decoding monmap, %d\n",
 459                       (int)PTR_ERR(monmap));
 460                goto out;
 461        }
 462
 463        if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 464                kfree(monmap);
 465                goto out;
 466        }
 467
 468        client->monc.monmap = monmap;
 469        kfree(old);
 470
 471        __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 472        client->have_fsid = true;
 473
 474out:
 475        mutex_unlock(&monc->mutex);
 476        wake_up_all(&client->auth_wq);
 477}
 478
 479/*
 480 * generic requests (currently statfs, mon_get_version)
 481 */
 482DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
 483
 484static void release_generic_request(struct kref *kref)
 485{
 486        struct ceph_mon_generic_request *req =
 487                container_of(kref, struct ceph_mon_generic_request, kref);
 488
 489        dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
 490             req->reply);
 491        WARN_ON(!RB_EMPTY_NODE(&req->node));
 492
 493        if (req->reply)
 494                ceph_msg_put(req->reply);
 495        if (req->request)
 496                ceph_msg_put(req->request);
 497
 498        kfree(req);
 499}
 500
 501static void put_generic_request(struct ceph_mon_generic_request *req)
 502{
 503        if (req)
 504                kref_put(&req->kref, release_generic_request);
 505}
 506
 507static void get_generic_request(struct ceph_mon_generic_request *req)
 508{
 509        kref_get(&req->kref);
 510}
 511
 512static struct ceph_mon_generic_request *
 513alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
 514{
 515        struct ceph_mon_generic_request *req;
 516
 517        req = kzalloc(sizeof(*req), gfp);
 518        if (!req)
 519                return NULL;
 520
 521        req->monc = monc;
 522        kref_init(&req->kref);
 523        RB_CLEAR_NODE(&req->node);
 524        init_completion(&req->completion);
 525
 526        dout("%s greq %p\n", __func__, req);
 527        return req;
 528}
 529
 530static void register_generic_request(struct ceph_mon_generic_request *req)
 531{
 532        struct ceph_mon_client *monc = req->monc;
 533
 534        WARN_ON(req->tid);
 535
 536        get_generic_request(req);
 537        req->tid = ++monc->last_tid;
 538        insert_generic_request(&monc->generic_request_tree, req);
 539}
 540
 541static void send_generic_request(struct ceph_mon_client *monc,
 542                                 struct ceph_mon_generic_request *req)
 543{
 544        WARN_ON(!req->tid);
 545
 546        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 547        req->request->hdr.tid = cpu_to_le64(req->tid);
 548        ceph_con_send(&monc->con, ceph_msg_get(req->request));
 549}
 550
 551static void __finish_generic_request(struct ceph_mon_generic_request *req)
 552{
 553        struct ceph_mon_client *monc = req->monc;
 554
 555        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 556        erase_generic_request(&monc->generic_request_tree, req);
 557
 558        ceph_msg_revoke(req->request);
 559        ceph_msg_revoke_incoming(req->reply);
 560}
 561
 562static void finish_generic_request(struct ceph_mon_generic_request *req)
 563{
 564        __finish_generic_request(req);
 565        put_generic_request(req);
 566}
 567
 568static void complete_generic_request(struct ceph_mon_generic_request *req)
 569{
 570        if (req->complete_cb)
 571                req->complete_cb(req);
 572        else
 573                complete_all(&req->completion);
 574        put_generic_request(req);
 575}
 576
 577static void cancel_generic_request(struct ceph_mon_generic_request *req)
 578{
 579        struct ceph_mon_client *monc = req->monc;
 580        struct ceph_mon_generic_request *lookup_req;
 581
 582        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 583
 584        mutex_lock(&monc->mutex);
 585        lookup_req = lookup_generic_request(&monc->generic_request_tree,
 586                                            req->tid);
 587        if (lookup_req) {
 588                WARN_ON(lookup_req != req);
 589                finish_generic_request(req);
 590        }
 591
 592        mutex_unlock(&monc->mutex);
 593}
 594
 595static int wait_generic_request(struct ceph_mon_generic_request *req)
 596{
 597        int ret;
 598
 599        dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 600        ret = wait_for_completion_interruptible(&req->completion);
 601        if (ret)
 602                cancel_generic_request(req);
 603        else
 604                ret = req->result; /* completed */
 605
 606        return ret;
 607}
 608
 609static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 610                                         struct ceph_msg_header *hdr,
 611                                         int *skip)
 612{
 613        struct ceph_mon_client *monc = con->private;
 614        struct ceph_mon_generic_request *req;
 615        u64 tid = le64_to_cpu(hdr->tid);
 616        struct ceph_msg *m;
 617
 618        mutex_lock(&monc->mutex);
 619        req = lookup_generic_request(&monc->generic_request_tree, tid);
 620        if (!req) {
 621                dout("get_generic_reply %lld dne\n", tid);
 622                *skip = 1;
 623                m = NULL;
 624        } else {
 625                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 626                *skip = 0;
 627                m = ceph_msg_get(req->reply);
 628                /*
 629                 * we don't need to track the connection reading into
 630                 * this reply because we only have one open connection
 631                 * at a time, ever.
 632                 */
 633        }
 634        mutex_unlock(&monc->mutex);
 635        return m;
 636}
 637
 638/*
 639 * statfs
 640 */
 641static void handle_statfs_reply(struct ceph_mon_client *monc,
 642                                struct ceph_msg *msg)
 643{
 644        struct ceph_mon_generic_request *req;
 645        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 646        u64 tid = le64_to_cpu(msg->hdr.tid);
 647
 648        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 649
 650        if (msg->front.iov_len != sizeof(*reply))
 651                goto bad;
 652
 653        mutex_lock(&monc->mutex);
 654        req = lookup_generic_request(&monc->generic_request_tree, tid);
 655        if (!req) {
 656                mutex_unlock(&monc->mutex);
 657                return;
 658        }
 659
 660        req->result = 0;
 661        *req->u.st = reply->st; /* struct */
 662        __finish_generic_request(req);
 663        mutex_unlock(&monc->mutex);
 664
 665        complete_generic_request(req);
 666        return;
 667
 668bad:
 669        pr_err("corrupt statfs reply, tid %llu\n", tid);
 670        ceph_msg_dump(msg);
 671}
 672
 673/*
 674 * Do a synchronous statfs().
 675 */
 676int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 677{
 678        struct ceph_mon_generic_request *req;
 679        struct ceph_mon_statfs *h;
 680        int ret = -ENOMEM;
 681
 682        req = alloc_generic_request(monc, GFP_NOFS);
 683        if (!req)
 684                goto out;
 685
 686        req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
 687                                    true);
 688        if (!req->request)
 689                goto out;
 690
 691        req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
 692        if (!req->reply)
 693                goto out;
 694
 695        req->u.st = buf;
 696
 697        mutex_lock(&monc->mutex);
 698        register_generic_request(req);
 699        /* fill out request */
 700        h = req->request->front.iov_base;
 701        h->monhdr.have_version = 0;
 702        h->monhdr.session_mon = cpu_to_le16(-1);
 703        h->monhdr.session_mon_tid = 0;
 704        h->fsid = monc->monmap->fsid;
 705        send_generic_request(monc, req);
 706        mutex_unlock(&monc->mutex);
 707
 708        ret = wait_generic_request(req);
 709out:
 710        put_generic_request(req);
 711        return ret;
 712}
 713EXPORT_SYMBOL(ceph_monc_do_statfs);
 714
 715static void handle_get_version_reply(struct ceph_mon_client *monc,
 716                                     struct ceph_msg *msg)
 717{
 718        struct ceph_mon_generic_request *req;
 719        u64 tid = le64_to_cpu(msg->hdr.tid);
 720        void *p = msg->front.iov_base;
 721        void *end = p + msg->front_alloc_len;
 722        u64 handle;
 723
 724        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 725
 726        ceph_decode_need(&p, end, 2*sizeof(u64), bad);
 727        handle = ceph_decode_64(&p);
 728        if (tid != 0 && tid != handle)
 729                goto bad;
 730
 731        mutex_lock(&monc->mutex);
 732        req = lookup_generic_request(&monc->generic_request_tree, handle);
 733        if (!req) {
 734                mutex_unlock(&monc->mutex);
 735                return;
 736        }
 737
 738        req->result = 0;
 739        req->u.newest = ceph_decode_64(&p);
 740        __finish_generic_request(req);
 741        mutex_unlock(&monc->mutex);
 742
 743        complete_generic_request(req);
 744        return;
 745
 746bad:
 747        pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
 748        ceph_msg_dump(msg);
 749}
 750
 751static struct ceph_mon_generic_request *
 752__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 753                        ceph_monc_callback_t cb, u64 private_data)
 754{
 755        struct ceph_mon_generic_request *req;
 756
 757        req = alloc_generic_request(monc, GFP_NOIO);
 758        if (!req)
 759                goto err_put_req;
 760
 761        req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
 762                                    sizeof(u64) + sizeof(u32) + strlen(what),
 763                                    GFP_NOIO, true);
 764        if (!req->request)
 765                goto err_put_req;
 766
 767        req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
 768                                  true);
 769        if (!req->reply)
 770                goto err_put_req;
 771
 772        req->complete_cb = cb;
 773        req->private_data = private_data;
 774
 775        mutex_lock(&monc->mutex);
 776        register_generic_request(req);
 777        {
 778                void *p = req->request->front.iov_base;
 779                void *const end = p + req->request->front_alloc_len;
 780
 781                ceph_encode_64(&p, req->tid); /* handle */
 782                ceph_encode_string(&p, end, what, strlen(what));
 783                WARN_ON(p != end);
 784        }
 785        send_generic_request(monc, req);
 786        mutex_unlock(&monc->mutex);
 787
 788        return req;
 789
 790err_put_req:
 791        put_generic_request(req);
 792        return ERR_PTR(-ENOMEM);
 793}
 794
 795/*
 796 * Send MMonGetVersion and wait for the reply.
 797 *
 798 * @what: one of "mdsmap", "osdmap" or "monmap"
 799 */
 800int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 801                          u64 *newest)
 802{
 803        struct ceph_mon_generic_request *req;
 804        int ret;
 805
 806        req = __ceph_monc_get_version(monc, what, NULL, 0);
 807        if (IS_ERR(req))
 808                return PTR_ERR(req);
 809
 810        ret = wait_generic_request(req);
 811        if (!ret)
 812                *newest = req->u.newest;
 813
 814        put_generic_request(req);
 815        return ret;
 816}
 817EXPORT_SYMBOL(ceph_monc_get_version);
 818
 819/*
 820 * Send MMonGetVersion,
 821 *
 822 * @what: one of "mdsmap", "osdmap" or "monmap"
 823 */
 824int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 825                                ceph_monc_callback_t cb, u64 private_data)
 826{
 827        struct ceph_mon_generic_request *req;
 828
 829        req = __ceph_monc_get_version(monc, what, cb, private_data);
 830        if (IS_ERR(req))
 831                return PTR_ERR(req);
 832
 833        put_generic_request(req);
 834        return 0;
 835}
 836EXPORT_SYMBOL(ceph_monc_get_version_async);
 837
 838static void handle_command_ack(struct ceph_mon_client *monc,
 839                               struct ceph_msg *msg)
 840{
 841        struct ceph_mon_generic_request *req;
 842        void *p = msg->front.iov_base;
 843        void *const end = p + msg->front_alloc_len;
 844        u64 tid = le64_to_cpu(msg->hdr.tid);
 845
 846        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 847
 848        ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
 849                                                            sizeof(u32), bad);
 850        p += sizeof(struct ceph_mon_request_header);
 851
 852        mutex_lock(&monc->mutex);
 853        req = lookup_generic_request(&monc->generic_request_tree, tid);
 854        if (!req) {
 855                mutex_unlock(&monc->mutex);
 856                return;
 857        }
 858
 859        req->result = ceph_decode_32(&p);
 860        __finish_generic_request(req);
 861        mutex_unlock(&monc->mutex);
 862
 863        complete_generic_request(req);
 864        return;
 865
 866bad:
 867        pr_err("corrupt mon_command ack, tid %llu\n", tid);
 868        ceph_msg_dump(msg);
 869}
 870
 871int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
 872                            struct ceph_entity_addr *client_addr)
 873{
 874        struct ceph_mon_generic_request *req;
 875        struct ceph_mon_command *h;
 876        int ret = -ENOMEM;
 877        int len;
 878
 879        req = alloc_generic_request(monc, GFP_NOIO);
 880        if (!req)
 881                goto out;
 882
 883        req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
 884        if (!req->request)
 885                goto out;
 886
 887        req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
 888                                  true);
 889        if (!req->reply)
 890                goto out;
 891
 892        mutex_lock(&monc->mutex);
 893        register_generic_request(req);
 894        h = req->request->front.iov_base;
 895        h->monhdr.have_version = 0;
 896        h->monhdr.session_mon = cpu_to_le16(-1);
 897        h->monhdr.session_mon_tid = 0;
 898        h->fsid = monc->monmap->fsid;
 899        h->num_strs = cpu_to_le32(1);
 900        len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
 901                                 \"blacklistop\": \"add\", \
 902                                 \"addr\": \"%pISpc/%u\" }",
 903                      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
 904        h->str_len = cpu_to_le32(len);
 905        send_generic_request(monc, req);
 906        mutex_unlock(&monc->mutex);
 907
 908        ret = wait_generic_request(req);
 909out:
 910        put_generic_request(req);
 911        return ret;
 912}
 913EXPORT_SYMBOL(ceph_monc_blacklist_add);
 914
 915/*
 916 * Resend pending generic requests.
 917 */
 918static void __resend_generic_request(struct ceph_mon_client *monc)
 919{
 920        struct ceph_mon_generic_request *req;
 921        struct rb_node *p;
 922
 923        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 924                req = rb_entry(p, struct ceph_mon_generic_request, node);
 925                ceph_msg_revoke(req->request);
 926                ceph_msg_revoke_incoming(req->reply);
 927                ceph_con_send(&monc->con, ceph_msg_get(req->request));
 928        }
 929}
 930
 931/*
 932 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 933 * renew/retry subscription as needed (in case it is timing out, or we
 934 * got an ENOMEM).  And keep the monitor connection alive.
 935 */
 936static void delayed_work(struct work_struct *work)
 937{
 938        struct ceph_mon_client *monc =
 939                container_of(work, struct ceph_mon_client, delayed_work.work);
 940
 941        dout("monc delayed_work\n");
 942        mutex_lock(&monc->mutex);
 943        if (monc->hunting) {
 944                dout("%s continuing hunt\n", __func__);
 945                reopen_session(monc);
 946        } else {
 947                int is_auth = ceph_auth_is_authenticated(monc->auth);
 948                if (ceph_con_keepalive_expired(&monc->con,
 949                                               CEPH_MONC_PING_TIMEOUT)) {
 950                        dout("monc keepalive timeout\n");
 951                        is_auth = 0;
 952                        reopen_session(monc);
 953                }
 954
 955                if (!monc->hunting) {
 956                        ceph_con_keepalive(&monc->con);
 957                        __validate_auth(monc);
 958                }
 959
 960                if (is_auth) {
 961                        unsigned long now = jiffies;
 962
 963                        dout("%s renew subs? now %lu renew after %lu\n",
 964                             __func__, now, monc->sub_renew_after);
 965                        if (time_after_eq(now, monc->sub_renew_after))
 966                                __send_subscribe(monc);
 967                }
 968        }
 969        __schedule_delayed(monc);
 970        mutex_unlock(&monc->mutex);
 971}
 972
 973/*
 974 * On startup, we build a temporary monmap populated with the IPs
 975 * provided by mount(2).
 976 */
 977static int build_initial_monmap(struct ceph_mon_client *monc)
 978{
 979        struct ceph_options *opt = monc->client->options;
 980        struct ceph_entity_addr *mon_addr = opt->mon_addr;
 981        int num_mon = opt->num_mon;
 982        int i;
 983
 984        /* build initial monmap */
 985        monc->monmap = kzalloc(sizeof(*monc->monmap) +
 986                               num_mon*sizeof(monc->monmap->mon_inst[0]),
 987                               GFP_KERNEL);
 988        if (!monc->monmap)
 989                return -ENOMEM;
 990        for (i = 0; i < num_mon; i++) {
 991                monc->monmap->mon_inst[i].addr = mon_addr[i];
 992                monc->monmap->mon_inst[i].addr.nonce = 0;
 993                monc->monmap->mon_inst[i].name.type =
 994                        CEPH_ENTITY_TYPE_MON;
 995                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
 996        }
 997        monc->monmap->num_mon = num_mon;
 998        return 0;
 999}
1000
1001int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1002{
1003        int err = 0;
1004
1005        dout("init\n");
1006        memset(monc, 0, sizeof(*monc));
1007        monc->client = cl;
1008        monc->monmap = NULL;
1009        mutex_init(&monc->mutex);
1010
1011        err = build_initial_monmap(monc);
1012        if (err)
1013                goto out;
1014
1015        /* connection */
1016        /* authentication */
1017        monc->auth = ceph_auth_init(cl->options->name,
1018                                    cl->options->key);
1019        if (IS_ERR(monc->auth)) {
1020                err = PTR_ERR(monc->auth);
1021                goto out_monmap;
1022        }
1023        monc->auth->want_keys =
1024                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1025                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1026
1027        /* msgs */
1028        err = -ENOMEM;
1029        monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1030                                     sizeof(struct ceph_mon_subscribe_ack),
1031                                     GFP_NOFS, true);
1032        if (!monc->m_subscribe_ack)
1033                goto out_auth;
1034
1035        monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS,
1036                                         true);
1037        if (!monc->m_subscribe)
1038                goto out_subscribe_ack;
1039
1040        monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
1041                                          true);
1042        if (!monc->m_auth_reply)
1043                goto out_subscribe;
1044
1045        monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
1046        monc->pending_auth = 0;
1047        if (!monc->m_auth)
1048                goto out_auth_reply;
1049
1050        ceph_con_init(&monc->con, monc, &mon_con_ops,
1051                      &monc->client->msgr);
1052
1053        monc->cur_mon = -1;
1054        monc->had_a_connection = false;
1055        monc->hunt_mult = 1;
1056
1057        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1058        monc->generic_request_tree = RB_ROOT;
1059        monc->last_tid = 0;
1060
1061        monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1062
1063        return 0;
1064
1065out_auth_reply:
1066        ceph_msg_put(monc->m_auth_reply);
1067out_subscribe:
1068        ceph_msg_put(monc->m_subscribe);
1069out_subscribe_ack:
1070        ceph_msg_put(monc->m_subscribe_ack);
1071out_auth:
1072        ceph_auth_destroy(monc->auth);
1073out_monmap:
1074        kfree(monc->monmap);
1075out:
1076        return err;
1077}
1078EXPORT_SYMBOL(ceph_monc_init);
1079
1080void ceph_monc_stop(struct ceph_mon_client *monc)
1081{
1082        dout("stop\n");
1083        cancel_delayed_work_sync(&monc->delayed_work);
1084
1085        mutex_lock(&monc->mutex);
1086        __close_session(monc);
1087        monc->cur_mon = -1;
1088        mutex_unlock(&monc->mutex);
1089
1090        /*
1091         * flush msgr queue before we destroy ourselves to ensure that:
1092         *  - any work that references our embedded con is finished.
1093         *  - any osd_client or other work that may reference an authorizer
1094         *    finishes before we shut down the auth subsystem.
1095         */
1096        ceph_msgr_flush();
1097
1098        ceph_auth_destroy(monc->auth);
1099
1100        WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1101
1102        ceph_msg_put(monc->m_auth);
1103        ceph_msg_put(monc->m_auth_reply);
1104        ceph_msg_put(monc->m_subscribe);
1105        ceph_msg_put(monc->m_subscribe_ack);
1106
1107        kfree(monc->monmap);
1108}
1109EXPORT_SYMBOL(ceph_monc_stop);
1110
1111static void finish_hunting(struct ceph_mon_client *monc)
1112{
1113        if (monc->hunting) {
1114                dout("%s found mon%d\n", __func__, monc->cur_mon);
1115                monc->hunting = false;
1116                monc->had_a_connection = true;
1117                monc->hunt_mult /= 2; /* reduce by 50% */
1118                if (monc->hunt_mult < 1)
1119                        monc->hunt_mult = 1;
1120        }
1121}
1122
1123static void handle_auth_reply(struct ceph_mon_client *monc,
1124                              struct ceph_msg *msg)
1125{
1126        int ret;
1127        int was_auth = 0;
1128
1129        mutex_lock(&monc->mutex);
1130        was_auth = ceph_auth_is_authenticated(monc->auth);
1131        monc->pending_auth = 0;
1132        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1133                                     msg->front.iov_len,
1134                                     monc->m_auth->front.iov_base,
1135                                     monc->m_auth->front_alloc_len);
1136        if (ret > 0) {
1137                __send_prepared_auth_request(monc, ret);
1138                goto out;
1139        }
1140
1141        finish_hunting(monc);
1142
1143        if (ret < 0) {
1144                monc->client->auth_err = ret;
1145        } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1146                dout("authenticated, starting session\n");
1147
1148                monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1149                monc->client->msgr.inst.name.num =
1150                                        cpu_to_le64(monc->auth->global_id);
1151
1152                __send_subscribe(monc);
1153                __resend_generic_request(monc);
1154
1155                pr_info("mon%d %s session established\n", monc->cur_mon,
1156                        ceph_pr_addr(&monc->con.peer_addr.in_addr));
1157        }
1158
1159out:
1160        mutex_unlock(&monc->mutex);
1161        if (monc->client->auth_err < 0)
1162                wake_up_all(&monc->client->auth_wq);
1163}
1164
1165static int __validate_auth(struct ceph_mon_client *monc)
1166{
1167        int ret;
1168
1169        if (monc->pending_auth)
1170                return 0;
1171
1172        ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1173                              monc->m_auth->front_alloc_len);
1174        if (ret <= 0)
1175                return ret; /* either an error, or no need to authenticate */
1176        __send_prepared_auth_request(monc, ret);
1177        return 0;
1178}
1179
1180int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1181{
1182        int ret;
1183
1184        mutex_lock(&monc->mutex);
1185        ret = __validate_auth(monc);
1186        mutex_unlock(&monc->mutex);
1187        return ret;
1188}
1189EXPORT_SYMBOL(ceph_monc_validate_auth);
1190
1191/*
1192 * handle incoming message
1193 */
1194static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1195{
1196        struct ceph_mon_client *monc = con->private;
1197        int type = le16_to_cpu(msg->hdr.type);
1198
1199        if (!monc)
1200                return;
1201
1202        switch (type) {
1203        case CEPH_MSG_AUTH_REPLY:
1204                handle_auth_reply(monc, msg);
1205                break;
1206
1207        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1208                handle_subscribe_ack(monc, msg);
1209                break;
1210
1211        case CEPH_MSG_STATFS_REPLY:
1212                handle_statfs_reply(monc, msg);
1213                break;
1214
1215        case CEPH_MSG_MON_GET_VERSION_REPLY:
1216                handle_get_version_reply(monc, msg);
1217                break;
1218
1219        case CEPH_MSG_MON_COMMAND_ACK:
1220                handle_command_ack(monc, msg);
1221                break;
1222
1223        case CEPH_MSG_MON_MAP:
1224                ceph_monc_handle_map(monc, msg);
1225                break;
1226
1227        case CEPH_MSG_OSD_MAP:
1228                ceph_osdc_handle_map(&monc->client->osdc, msg);
1229                break;
1230
1231        default:
1232                /* can the chained handler handle it? */
1233                if (monc->client->extra_mon_dispatch &&
1234                    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1235                        break;
1236                        
1237                pr_err("received unknown message type %d %s\n", type,
1238                       ceph_msg_type_name(type));
1239        }
1240        ceph_msg_put(msg);
1241}
1242
1243/*
1244 * Allocate memory for incoming message
1245 */
1246static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1247                                      struct ceph_msg_header *hdr,
1248                                      int *skip)
1249{
1250        struct ceph_mon_client *monc = con->private;
1251        int type = le16_to_cpu(hdr->type);
1252        int front_len = le32_to_cpu(hdr->front_len);
1253        struct ceph_msg *m = NULL;
1254
1255        *skip = 0;
1256
1257        switch (type) {
1258        case CEPH_MSG_MON_SUBSCRIBE_ACK:
1259                m = ceph_msg_get(monc->m_subscribe_ack);
1260                break;
1261        case CEPH_MSG_STATFS_REPLY:
1262        case CEPH_MSG_MON_COMMAND_ACK:
1263                return get_generic_reply(con, hdr, skip);
1264        case CEPH_MSG_AUTH_REPLY:
1265                m = ceph_msg_get(monc->m_auth_reply);
1266                break;
1267        case CEPH_MSG_MON_GET_VERSION_REPLY:
1268                if (le64_to_cpu(hdr->tid) != 0)
1269                        return get_generic_reply(con, hdr, skip);
1270
1271                /*
1272                 * Older OSDs don't set reply tid even if the orignal
1273                 * request had a non-zero tid.  Workaround this weirdness
1274                 * by falling through to the allocate case.
1275                 */
1276        case CEPH_MSG_MON_MAP:
1277        case CEPH_MSG_MDS_MAP:
1278        case CEPH_MSG_OSD_MAP:
1279        case CEPH_MSG_FS_MAP_USER:
1280                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1281                if (!m)
1282                        return NULL;    /* ENOMEM--return skip == 0 */
1283                break;
1284        }
1285
1286        if (!m) {
1287                pr_info("alloc_msg unknown type %d\n", type);
1288                *skip = 1;
1289        } else if (front_len > m->front_alloc_len) {
1290                pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1291                        front_len, m->front_alloc_len,
1292                        (unsigned int)con->peer_name.type,
1293                        le64_to_cpu(con->peer_name.num));
1294                ceph_msg_put(m);
1295                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1296        }
1297
1298        return m;
1299}
1300
1301/*
1302 * If the monitor connection resets, pick a new monitor and resubmit
1303 * any pending requests.
1304 */
1305static void mon_fault(struct ceph_connection *con)
1306{
1307        struct ceph_mon_client *monc = con->private;
1308
1309        mutex_lock(&monc->mutex);
1310        dout("%s mon%d\n", __func__, monc->cur_mon);
1311        if (monc->cur_mon >= 0) {
1312                if (!monc->hunting) {
1313                        dout("%s hunting for new mon\n", __func__);
1314                        reopen_session(monc);
1315                        __schedule_delayed(monc);
1316                } else {
1317                        dout("%s already hunting\n", __func__);
1318                }
1319        }
1320        mutex_unlock(&monc->mutex);
1321}
1322
1323/*
1324 * We can ignore refcounting on the connection struct, as all references
1325 * will come from the messenger workqueue, which is drained prior to
1326 * mon_client destruction.
1327 */
1328static struct ceph_connection *con_get(struct ceph_connection *con)
1329{
1330        return con;
1331}
1332
1333static void con_put(struct ceph_connection *con)
1334{
1335}
1336
1337static const struct ceph_connection_operations mon_con_ops = {
1338        .get = con_get,
1339        .put = con_put,
1340        .dispatch = dispatch,
1341        .fault = mon_fault,
1342        .alloc_msg = mon_alloc_msg,
1343};
1344