linux/net/ceph/mon_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/types.h>
   5#include <linux/slab.h>
   6#include <linux/random.h>
   7#include <linux/sched.h>
   8
   9#include <linux/ceph/mon_client.h>
  10#include <linux/ceph/libceph.h>
  11#include <linux/ceph/decode.h>
  12
  13#include <linux/ceph/auth.h>
  14
  15/*
  16 * Interact with Ceph monitor cluster.  Handle requests for new map
  17 * versions, and periodically resend as needed.  Also implement
  18 * statfs() and umount().
  19 *
  20 * A small cluster of Ceph "monitors" are responsible for managing critical
  21 * cluster configuration and state information.  An odd number (e.g., 3, 5)
  22 * of cmon daemons use a modified version of the Paxos part-time parliament
  23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  24 * list of clients who have mounted the file system.
  25 *
  26 * We maintain an open, active session with a monitor at all times in order to
  27 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  28 * TCP socket to ensure we detect a failure.  If the connection does break, we
  29 * randomly hunt for a new monitor.  Once the connection is reestablished, we
  30 * resend any outstanding requests.
  31 */
  32
  33static const struct ceph_connection_operations mon_con_ops;
  34
  35static int __validate_auth(struct ceph_mon_client *monc);
  36
  37/*
  38 * Decode a monmap blob (e.g., during mount).
  39 */
  40struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  41{
  42        struct ceph_monmap *m = NULL;
  43        int i, err = -EINVAL;
  44        struct ceph_fsid fsid;
  45        u32 epoch, num_mon;
  46        u16 version;
  47        u32 len;
  48
  49        ceph_decode_32_safe(&p, end, len, bad);
  50        ceph_decode_need(&p, end, len, bad);
  51
  52        dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  53
  54        ceph_decode_16_safe(&p, end, version, bad);
  55
  56        ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  57        ceph_decode_copy(&p, &fsid, sizeof(fsid));
  58        epoch = ceph_decode_32(&p);
  59
  60        num_mon = ceph_decode_32(&p);
  61        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  62
  63        if (num_mon >= CEPH_MAX_MON)
  64                goto bad;
  65        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  66        if (m == NULL)
  67                return ERR_PTR(-ENOMEM);
  68        m->fsid = fsid;
  69        m->epoch = epoch;
  70        m->num_mon = num_mon;
  71        ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  72        for (i = 0; i < num_mon; i++)
  73                ceph_decode_addr(&m->mon_inst[i].addr);
  74
  75        dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  76             m->num_mon);
  77        for (i = 0; i < m->num_mon; i++)
  78                dout("monmap_decode  mon%d is %s\n", i,
  79                     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  80        return m;
  81
  82bad:
  83        dout("monmap_decode failed with %d\n", err);
  84        kfree(m);
  85        return ERR_PTR(err);
  86}
  87
  88/*
  89 * return true if *addr is included in the monmap.
  90 */
  91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  92{
  93        int i;
  94
  95        for (i = 0; i < m->num_mon; i++)
  96                if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  97                        return 1;
  98        return 0;
  99}
 100
 101/*
 102 * Send an auth request.
 103 */
 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 105{
 106        monc->pending_auth = 1;
 107        monc->m_auth->front.iov_len = len;
 108        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 109        ceph_con_revoke(monc->con, monc->m_auth);
 110        ceph_msg_get(monc->m_auth);  /* keep our ref */
 111        ceph_con_send(monc->con, monc->m_auth);
 112}
 113
 114/*
 115 * Close monitor session, if any.
 116 */
 117static void __close_session(struct ceph_mon_client *monc)
 118{
 119        if (monc->con) {
 120                dout("__close_session closing mon%d\n", monc->cur_mon);
 121                ceph_con_revoke(monc->con, monc->m_auth);
 122                ceph_con_close(monc->con);
 123                monc->cur_mon = -1;
 124                monc->pending_auth = 0;
 125                ceph_auth_reset(monc->auth);
 126        }
 127}
 128
 129/*
 130 * Open a session with a (new) monitor.
 131 */
 132static int __open_session(struct ceph_mon_client *monc)
 133{
 134        char r;
 135        int ret;
 136
 137        if (monc->cur_mon < 0) {
 138                get_random_bytes(&r, 1);
 139                monc->cur_mon = r % monc->monmap->num_mon;
 140                dout("open_session num=%d r=%d -> mon%d\n",
 141                     monc->monmap->num_mon, r, monc->cur_mon);
 142                monc->sub_sent = 0;
 143                monc->sub_renew_after = jiffies;  /* i.e., expired */
 144                monc->want_next_osdmap = !!monc->want_next_osdmap;
 145
 146                dout("open_session mon%d opening\n", monc->cur_mon);
 147                monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
 148                monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
 149                ceph_con_open(monc->con,
 150                              &monc->monmap->mon_inst[monc->cur_mon].addr);
 151
 152                /* initiatiate authentication handshake */
 153                ret = ceph_auth_build_hello(monc->auth,
 154                                            monc->m_auth->front.iov_base,
 155                                            monc->m_auth->front_max);
 156                __send_prepared_auth_request(monc, ret);
 157        } else {
 158                dout("open_session mon%d already open\n", monc->cur_mon);
 159        }
 160        return 0;
 161}
 162
 163static bool __sub_expired(struct ceph_mon_client *monc)
 164{
 165        return time_after_eq(jiffies, monc->sub_renew_after);
 166}
 167
 168/*
 169 * Reschedule delayed work timer.
 170 */
 171static void __schedule_delayed(struct ceph_mon_client *monc)
 172{
 173        unsigned delay;
 174
 175        if (monc->cur_mon < 0 || __sub_expired(monc))
 176                delay = 10 * HZ;
 177        else
 178                delay = 20 * HZ;
 179        dout("__schedule_delayed after %u\n", delay);
 180        schedule_delayed_work(&monc->delayed_work, delay);
 181}
 182
 183/*
 184 * Send subscribe request for mdsmap and/or osdmap.
 185 */
 186static void __send_subscribe(struct ceph_mon_client *monc)
 187{
 188        dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
 189             (unsigned)monc->sub_sent, __sub_expired(monc),
 190             monc->want_next_osdmap);
 191        if ((__sub_expired(monc) && !monc->sub_sent) ||
 192            monc->want_next_osdmap == 1) {
 193                struct ceph_msg *msg = monc->m_subscribe;
 194                struct ceph_mon_subscribe_item *i;
 195                void *p, *end;
 196                int num;
 197
 198                p = msg->front.iov_base;
 199                end = p + msg->front_max;
 200
 201                num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
 202                ceph_encode_32(&p, num);
 203
 204                if (monc->want_next_osdmap) {
 205                        dout("__send_subscribe to 'osdmap' %u\n",
 206                             (unsigned)monc->have_osdmap);
 207                        ceph_encode_string(&p, end, "osdmap", 6);
 208                        i = p;
 209                        i->have = cpu_to_le64(monc->have_osdmap);
 210                        i->onetime = 1;
 211                        p += sizeof(*i);
 212                        monc->want_next_osdmap = 2;  /* requested */
 213                }
 214                if (monc->want_mdsmap) {
 215                        dout("__send_subscribe to 'mdsmap' %u+\n",
 216                             (unsigned)monc->have_mdsmap);
 217                        ceph_encode_string(&p, end, "mdsmap", 6);
 218                        i = p;
 219                        i->have = cpu_to_le64(monc->have_mdsmap);
 220                        i->onetime = 0;
 221                        p += sizeof(*i);
 222                }
 223                ceph_encode_string(&p, end, "monmap", 6);
 224                i = p;
 225                i->have = 0;
 226                i->onetime = 0;
 227                p += sizeof(*i);
 228
 229                msg->front.iov_len = p - msg->front.iov_base;
 230                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 231                ceph_con_revoke(monc->con, msg);
 232                ceph_con_send(monc->con, ceph_msg_get(msg));
 233
 234                monc->sub_sent = jiffies | 1;  /* never 0 */
 235        }
 236}
 237
 238static void handle_subscribe_ack(struct ceph_mon_client *monc,
 239                                 struct ceph_msg *msg)
 240{
 241        unsigned seconds;
 242        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 243
 244        if (msg->front.iov_len < sizeof(*h))
 245                goto bad;
 246        seconds = le32_to_cpu(h->duration);
 247
 248        mutex_lock(&monc->mutex);
 249        if (monc->hunting) {
 250                pr_info("mon%d %s session established\n",
 251                        monc->cur_mon,
 252                        ceph_pr_addr(&monc->con->peer_addr.in_addr));
 253                monc->hunting = false;
 254        }
 255        dout("handle_subscribe_ack after %d seconds\n", seconds);
 256        monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
 257        monc->sub_sent = 0;
 258        mutex_unlock(&monc->mutex);
 259        return;
 260bad:
 261        pr_err("got corrupt subscribe-ack msg\n");
 262        ceph_msg_dump(msg);
 263}
 264
 265/*
 266 * Keep track of which maps we have
 267 */
 268int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
 269{
 270        mutex_lock(&monc->mutex);
 271        monc->have_mdsmap = got;
 272        mutex_unlock(&monc->mutex);
 273        return 0;
 274}
 275EXPORT_SYMBOL(ceph_monc_got_mdsmap);
 276
 277int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
 278{
 279        mutex_lock(&monc->mutex);
 280        monc->have_osdmap = got;
 281        monc->want_next_osdmap = 0;
 282        mutex_unlock(&monc->mutex);
 283        return 0;
 284}
 285
 286/*
 287 * Register interest in the next osdmap
 288 */
 289void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 290{
 291        dout("request_next_osdmap have %u\n", monc->have_osdmap);
 292        mutex_lock(&monc->mutex);
 293        if (!monc->want_next_osdmap)
 294                monc->want_next_osdmap = 1;
 295        if (monc->want_next_osdmap < 2)
 296                __send_subscribe(monc);
 297        mutex_unlock(&monc->mutex);
 298}
 299
 300/*
 301 *
 302 */
 303int ceph_monc_open_session(struct ceph_mon_client *monc)
 304{
 305        if (!monc->con) {
 306                monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
 307                if (!monc->con)
 308                        return -ENOMEM;
 309                ceph_con_init(monc->client->msgr, monc->con);
 310                monc->con->private = monc;
 311                monc->con->ops = &mon_con_ops;
 312        }
 313
 314        mutex_lock(&monc->mutex);
 315        __open_session(monc);
 316        __schedule_delayed(monc);
 317        mutex_unlock(&monc->mutex);
 318        return 0;
 319}
 320EXPORT_SYMBOL(ceph_monc_open_session);
 321
 322/*
 323 * The monitor responds with mount ack indicate mount success.  The
 324 * included client ticket allows the client to talk to MDSs and OSDs.
 325 */
 326static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 327                                 struct ceph_msg *msg)
 328{
 329        struct ceph_client *client = monc->client;
 330        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 331        void *p, *end;
 332
 333        mutex_lock(&monc->mutex);
 334
 335        dout("handle_monmap\n");
 336        p = msg->front.iov_base;
 337        end = p + msg->front.iov_len;
 338
 339        monmap = ceph_monmap_decode(p, end);
 340        if (IS_ERR(monmap)) {
 341                pr_err("problem decoding monmap, %d\n",
 342                       (int)PTR_ERR(monmap));
 343                goto out;
 344        }
 345
 346        if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 347                kfree(monmap);
 348                goto out;
 349        }
 350
 351        client->monc.monmap = monmap;
 352        kfree(old);
 353
 354out:
 355        mutex_unlock(&monc->mutex);
 356        wake_up_all(&client->auth_wq);
 357}
 358
 359/*
 360 * generic requests (e.g., statfs, poolop)
 361 */
 362static struct ceph_mon_generic_request *__lookup_generic_req(
 363        struct ceph_mon_client *monc, u64 tid)
 364{
 365        struct ceph_mon_generic_request *req;
 366        struct rb_node *n = monc->generic_request_tree.rb_node;
 367
 368        while (n) {
 369                req = rb_entry(n, struct ceph_mon_generic_request, node);
 370                if (tid < req->tid)
 371                        n = n->rb_left;
 372                else if (tid > req->tid)
 373                        n = n->rb_right;
 374                else
 375                        return req;
 376        }
 377        return NULL;
 378}
 379
 380static void __insert_generic_request(struct ceph_mon_client *monc,
 381                            struct ceph_mon_generic_request *new)
 382{
 383        struct rb_node **p = &monc->generic_request_tree.rb_node;
 384        struct rb_node *parent = NULL;
 385        struct ceph_mon_generic_request *req = NULL;
 386
 387        while (*p) {
 388                parent = *p;
 389                req = rb_entry(parent, struct ceph_mon_generic_request, node);
 390                if (new->tid < req->tid)
 391                        p = &(*p)->rb_left;
 392                else if (new->tid > req->tid)
 393                        p = &(*p)->rb_right;
 394                else
 395                        BUG();
 396        }
 397
 398        rb_link_node(&new->node, parent, p);
 399        rb_insert_color(&new->node, &monc->generic_request_tree);
 400}
 401
 402static void release_generic_request(struct kref *kref)
 403{
 404        struct ceph_mon_generic_request *req =
 405                container_of(kref, struct ceph_mon_generic_request, kref);
 406
 407        if (req->reply)
 408                ceph_msg_put(req->reply);
 409        if (req->request)
 410                ceph_msg_put(req->request);
 411
 412        kfree(req);
 413}
 414
 415static void put_generic_request(struct ceph_mon_generic_request *req)
 416{
 417        kref_put(&req->kref, release_generic_request);
 418}
 419
 420static void get_generic_request(struct ceph_mon_generic_request *req)
 421{
 422        kref_get(&req->kref);
 423}
 424
 425static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 426                                         struct ceph_msg_header *hdr,
 427                                         int *skip)
 428{
 429        struct ceph_mon_client *monc = con->private;
 430        struct ceph_mon_generic_request *req;
 431        u64 tid = le64_to_cpu(hdr->tid);
 432        struct ceph_msg *m;
 433
 434        mutex_lock(&monc->mutex);
 435        req = __lookup_generic_req(monc, tid);
 436        if (!req) {
 437                dout("get_generic_reply %lld dne\n", tid);
 438                *skip = 1;
 439                m = NULL;
 440        } else {
 441                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 442                m = ceph_msg_get(req->reply);
 443                /*
 444                 * we don't need to track the connection reading into
 445                 * this reply because we only have one open connection
 446                 * at a time, ever.
 447                 */
 448        }
 449        mutex_unlock(&monc->mutex);
 450        return m;
 451}
 452
 453static int do_generic_request(struct ceph_mon_client *monc,
 454                              struct ceph_mon_generic_request *req)
 455{
 456        int err;
 457
 458        /* register request */
 459        mutex_lock(&monc->mutex);
 460        req->tid = ++monc->last_tid;
 461        req->request->hdr.tid = cpu_to_le64(req->tid);
 462        __insert_generic_request(monc, req);
 463        monc->num_generic_requests++;
 464        ceph_con_send(monc->con, ceph_msg_get(req->request));
 465        mutex_unlock(&monc->mutex);
 466
 467        err = wait_for_completion_interruptible(&req->completion);
 468
 469        mutex_lock(&monc->mutex);
 470        rb_erase(&req->node, &monc->generic_request_tree);
 471        monc->num_generic_requests--;
 472        mutex_unlock(&monc->mutex);
 473
 474        if (!err)
 475                err = req->result;
 476        return err;
 477}
 478
 479/*
 480 * statfs
 481 */
 482static void handle_statfs_reply(struct ceph_mon_client *monc,
 483                                struct ceph_msg *msg)
 484{
 485        struct ceph_mon_generic_request *req;
 486        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 487        u64 tid = le64_to_cpu(msg->hdr.tid);
 488
 489        if (msg->front.iov_len != sizeof(*reply))
 490                goto bad;
 491        dout("handle_statfs_reply %p tid %llu\n", msg, tid);
 492
 493        mutex_lock(&monc->mutex);
 494        req = __lookup_generic_req(monc, tid);
 495        if (req) {
 496                *(struct ceph_statfs *)req->buf = reply->st;
 497                req->result = 0;
 498                get_generic_request(req);
 499        }
 500        mutex_unlock(&monc->mutex);
 501        if (req) {
 502                complete_all(&req->completion);
 503                put_generic_request(req);
 504        }
 505        return;
 506
 507bad:
 508        pr_err("corrupt generic reply, tid %llu\n", tid);
 509        ceph_msg_dump(msg);
 510}
 511
 512/*
 513 * Do a synchronous statfs().
 514 */
 515int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 516{
 517        struct ceph_mon_generic_request *req;
 518        struct ceph_mon_statfs *h;
 519        int err;
 520
 521        req = kzalloc(sizeof(*req), GFP_NOFS);
 522        if (!req)
 523                return -ENOMEM;
 524
 525        kref_init(&req->kref);
 526        req->buf = buf;
 527        req->buf_len = sizeof(*buf);
 528        init_completion(&req->completion);
 529
 530        err = -ENOMEM;
 531        req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
 532        if (!req->request)
 533                goto out;
 534        req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
 535        if (!req->reply)
 536                goto out;
 537
 538        /* fill out request */
 539        h = req->request->front.iov_base;
 540        h->monhdr.have_version = 0;
 541        h->monhdr.session_mon = cpu_to_le16(-1);
 542        h->monhdr.session_mon_tid = 0;
 543        h->fsid = monc->monmap->fsid;
 544
 545        err = do_generic_request(monc, req);
 546
 547out:
 548        kref_put(&req->kref, release_generic_request);
 549        return err;
 550}
 551EXPORT_SYMBOL(ceph_monc_do_statfs);
 552
 553/*
 554 * pool ops
 555 */
 556static int get_poolop_reply_buf(const char *src, size_t src_len,
 557                                char *dst, size_t dst_len)
 558{
 559        u32 buf_len;
 560
 561        if (src_len != sizeof(u32) + dst_len)
 562                return -EINVAL;
 563
 564        buf_len = le32_to_cpu(*(u32 *)src);
 565        if (buf_len != dst_len)
 566                return -EINVAL;
 567
 568        memcpy(dst, src + sizeof(u32), dst_len);
 569        return 0;
 570}
 571
 572static void handle_poolop_reply(struct ceph_mon_client *monc,
 573                                struct ceph_msg *msg)
 574{
 575        struct ceph_mon_generic_request *req;
 576        struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
 577        u64 tid = le64_to_cpu(msg->hdr.tid);
 578
 579        if (msg->front.iov_len < sizeof(*reply))
 580                goto bad;
 581        dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 582
 583        mutex_lock(&monc->mutex);
 584        req = __lookup_generic_req(monc, tid);
 585        if (req) {
 586                if (req->buf_len &&
 587                    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
 588                                     msg->front.iov_len - sizeof(*reply),
 589                                     req->buf, req->buf_len) < 0) {
 590                        mutex_unlock(&monc->mutex);
 591                        goto bad;
 592                }
 593                req->result = le32_to_cpu(reply->reply_code);
 594                get_generic_request(req);
 595        }
 596        mutex_unlock(&monc->mutex);
 597        if (req) {
 598                complete(&req->completion);
 599                put_generic_request(req);
 600        }
 601        return;
 602
 603bad:
 604        pr_err("corrupt generic reply, tid %llu\n", tid);
 605        ceph_msg_dump(msg);
 606}
 607
 608/*
 609 * Do a synchronous pool op.
 610 */
 611int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
 612                        u32 pool, u64 snapid,
 613                        char *buf, int len)
 614{
 615        struct ceph_mon_generic_request *req;
 616        struct ceph_mon_poolop *h;
 617        int err;
 618
 619        req = kzalloc(sizeof(*req), GFP_NOFS);
 620        if (!req)
 621                return -ENOMEM;
 622
 623        kref_init(&req->kref);
 624        req->buf = buf;
 625        req->buf_len = len;
 626        init_completion(&req->completion);
 627
 628        err = -ENOMEM;
 629        req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
 630        if (!req->request)
 631                goto out;
 632        req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
 633        if (!req->reply)
 634                goto out;
 635
 636        /* fill out request */
 637        req->request->hdr.version = cpu_to_le16(2);
 638        h = req->request->front.iov_base;
 639        h->monhdr.have_version = 0;
 640        h->monhdr.session_mon = cpu_to_le16(-1);
 641        h->monhdr.session_mon_tid = 0;
 642        h->fsid = monc->monmap->fsid;
 643        h->pool = cpu_to_le32(pool);
 644        h->op = cpu_to_le32(op);
 645        h->auid = 0;
 646        h->snapid = cpu_to_le64(snapid);
 647        h->name_len = 0;
 648
 649        err = do_generic_request(monc, req);
 650
 651out:
 652        kref_put(&req->kref, release_generic_request);
 653        return err;
 654}
 655
 656int ceph_monc_create_snapid(struct ceph_mon_client *monc,
 657                            u32 pool, u64 *snapid)
 658{
 659        return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
 660                                   pool, 0, (char *)snapid, sizeof(*snapid));
 661
 662}
 663EXPORT_SYMBOL(ceph_monc_create_snapid);
 664
 665int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
 666                            u32 pool, u64 snapid)
 667{
 668        return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
 669                                   pool, snapid, 0, 0);
 670
 671}
 672
 673/*
 674 * Resend pending generic requests.
 675 */
 676static void __resend_generic_request(struct ceph_mon_client *monc)
 677{
 678        struct ceph_mon_generic_request *req;
 679        struct rb_node *p;
 680
 681        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 682                req = rb_entry(p, struct ceph_mon_generic_request, node);
 683                ceph_con_revoke(monc->con, req->request);
 684                ceph_con_send(monc->con, ceph_msg_get(req->request));
 685        }
 686}
 687
 688/*
 689 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 690 * renew/retry subscription as needed (in case it is timing out, or we
 691 * got an ENOMEM).  And keep the monitor connection alive.
 692 */
 693static void delayed_work(struct work_struct *work)
 694{
 695        struct ceph_mon_client *monc =
 696                container_of(work, struct ceph_mon_client, delayed_work.work);
 697
 698        dout("monc delayed_work\n");
 699        mutex_lock(&monc->mutex);
 700        if (monc->hunting) {
 701                __close_session(monc);
 702                __open_session(monc);  /* continue hunting */
 703        } else {
 704                ceph_con_keepalive(monc->con);
 705
 706                __validate_auth(monc);
 707
 708                if (monc->auth->ops->is_authenticated(monc->auth))
 709                        __send_subscribe(monc);
 710        }
 711        __schedule_delayed(monc);
 712        mutex_unlock(&monc->mutex);
 713}
 714
 715/*
 716 * On startup, we build a temporary monmap populated with the IPs
 717 * provided by mount(2).
 718 */
 719static int build_initial_monmap(struct ceph_mon_client *monc)
 720{
 721        struct ceph_options *opt = monc->client->options;
 722        struct ceph_entity_addr *mon_addr = opt->mon_addr;
 723        int num_mon = opt->num_mon;
 724        int i;
 725
 726        /* build initial monmap */
 727        monc->monmap = kzalloc(sizeof(*monc->monmap) +
 728                               num_mon*sizeof(monc->monmap->mon_inst[0]),
 729                               GFP_KERNEL);
 730        if (!monc->monmap)
 731                return -ENOMEM;
 732        for (i = 0; i < num_mon; i++) {
 733                monc->monmap->mon_inst[i].addr = mon_addr[i];
 734                monc->monmap->mon_inst[i].addr.nonce = 0;
 735                monc->monmap->mon_inst[i].name.type =
 736                        CEPH_ENTITY_TYPE_MON;
 737                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
 738        }
 739        monc->monmap->num_mon = num_mon;
 740        monc->have_fsid = false;
 741        return 0;
 742}
 743
 744int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 745{
 746        int err = 0;
 747
 748        dout("init\n");
 749        memset(monc, 0, sizeof(*monc));
 750        monc->client = cl;
 751        monc->monmap = NULL;
 752        mutex_init(&monc->mutex);
 753
 754        err = build_initial_monmap(monc);
 755        if (err)
 756                goto out;
 757
 758        monc->con = NULL;
 759
 760        /* authentication */
 761        monc->auth = ceph_auth_init(cl->options->name,
 762                                    cl->options->key);
 763        if (IS_ERR(monc->auth))
 764                return PTR_ERR(monc->auth);
 765        monc->auth->want_keys =
 766                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
 767                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
 768
 769        /* msgs */
 770        err = -ENOMEM;
 771        monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
 772                                     sizeof(struct ceph_mon_subscribe_ack),
 773                                     GFP_NOFS);
 774        if (!monc->m_subscribe_ack)
 775                goto out_monmap;
 776
 777        monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
 778        if (!monc->m_subscribe)
 779                goto out_subscribe_ack;
 780
 781        monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
 782        if (!monc->m_auth_reply)
 783                goto out_subscribe;
 784
 785        monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
 786        monc->pending_auth = 0;
 787        if (!monc->m_auth)
 788                goto out_auth_reply;
 789
 790        monc->cur_mon = -1;
 791        monc->hunting = true;
 792        monc->sub_renew_after = jiffies;
 793        monc->sub_sent = 0;
 794
 795        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 796        monc->generic_request_tree = RB_ROOT;
 797        monc->num_generic_requests = 0;
 798        monc->last_tid = 0;
 799
 800        monc->have_mdsmap = 0;
 801        monc->have_osdmap = 0;
 802        monc->want_next_osdmap = 1;
 803        return 0;
 804
 805out_auth_reply:
 806        ceph_msg_put(monc->m_auth_reply);
 807out_subscribe:
 808        ceph_msg_put(monc->m_subscribe);
 809out_subscribe_ack:
 810        ceph_msg_put(monc->m_subscribe_ack);
 811out_monmap:
 812        kfree(monc->monmap);
 813out:
 814        return err;
 815}
 816EXPORT_SYMBOL(ceph_monc_init);
 817
 818void ceph_monc_stop(struct ceph_mon_client *monc)
 819{
 820        dout("stop\n");
 821        cancel_delayed_work_sync(&monc->delayed_work);
 822
 823        mutex_lock(&monc->mutex);
 824        __close_session(monc);
 825        if (monc->con) {
 826                monc->con->private = NULL;
 827                monc->con->ops->put(monc->con);
 828                monc->con = NULL;
 829        }
 830        mutex_unlock(&monc->mutex);
 831
 832        ceph_auth_destroy(monc->auth);
 833
 834        ceph_msg_put(monc->m_auth);
 835        ceph_msg_put(monc->m_auth_reply);
 836        ceph_msg_put(monc->m_subscribe);
 837        ceph_msg_put(monc->m_subscribe_ack);
 838
 839        kfree(monc->monmap);
 840}
 841EXPORT_SYMBOL(ceph_monc_stop);
 842
 843static void handle_auth_reply(struct ceph_mon_client *monc,
 844                              struct ceph_msg *msg)
 845{
 846        int ret;
 847        int was_auth = 0;
 848
 849        mutex_lock(&monc->mutex);
 850        if (monc->auth->ops)
 851                was_auth = monc->auth->ops->is_authenticated(monc->auth);
 852        monc->pending_auth = 0;
 853        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 854                                     msg->front.iov_len,
 855                                     monc->m_auth->front.iov_base,
 856                                     monc->m_auth->front_max);
 857        if (ret < 0) {
 858                monc->client->auth_err = ret;
 859                wake_up_all(&monc->client->auth_wq);
 860        } else if (ret > 0) {
 861                __send_prepared_auth_request(monc, ret);
 862        } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
 863                dout("authenticated, starting session\n");
 864
 865                monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
 866                monc->client->msgr->inst.name.num =
 867                                        cpu_to_le64(monc->auth->global_id);
 868
 869                __send_subscribe(monc);
 870                __resend_generic_request(monc);
 871        }
 872        mutex_unlock(&monc->mutex);
 873}
 874
 875static int __validate_auth(struct ceph_mon_client *monc)
 876{
 877        int ret;
 878
 879        if (monc->pending_auth)
 880                return 0;
 881
 882        ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
 883                              monc->m_auth->front_max);
 884        if (ret <= 0)
 885                return ret; /* either an error, or no need to authenticate */
 886        __send_prepared_auth_request(monc, ret);
 887        return 0;
 888}
 889
 890int ceph_monc_validate_auth(struct ceph_mon_client *monc)
 891{
 892        int ret;
 893
 894        mutex_lock(&monc->mutex);
 895        ret = __validate_auth(monc);
 896        mutex_unlock(&monc->mutex);
 897        return ret;
 898}
 899EXPORT_SYMBOL(ceph_monc_validate_auth);
 900
 901/*
 902 * handle incoming message
 903 */
 904static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 905{
 906        struct ceph_mon_client *monc = con->private;
 907        int type = le16_to_cpu(msg->hdr.type);
 908
 909        if (!monc)
 910                return;
 911
 912        switch (type) {
 913        case CEPH_MSG_AUTH_REPLY:
 914                handle_auth_reply(monc, msg);
 915                break;
 916
 917        case CEPH_MSG_MON_SUBSCRIBE_ACK:
 918                handle_subscribe_ack(monc, msg);
 919                break;
 920
 921        case CEPH_MSG_STATFS_REPLY:
 922                handle_statfs_reply(monc, msg);
 923                break;
 924
 925        case CEPH_MSG_POOLOP_REPLY:
 926                handle_poolop_reply(monc, msg);
 927                break;
 928
 929        case CEPH_MSG_MON_MAP:
 930                ceph_monc_handle_map(monc, msg);
 931                break;
 932
 933        case CEPH_MSG_OSD_MAP:
 934                ceph_osdc_handle_map(&monc->client->osdc, msg);
 935                break;
 936
 937        default:
 938                /* can the chained handler handle it? */
 939                if (monc->client->extra_mon_dispatch &&
 940                    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
 941                        break;
 942                        
 943                pr_err("received unknown message type %d %s\n", type,
 944                       ceph_msg_type_name(type));
 945        }
 946        ceph_msg_put(msg);
 947}
 948
 949/*
 950 * Allocate memory for incoming message
 951 */
 952static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 953                                      struct ceph_msg_header *hdr,
 954                                      int *skip)
 955{
 956        struct ceph_mon_client *monc = con->private;
 957        int type = le16_to_cpu(hdr->type);
 958        int front_len = le32_to_cpu(hdr->front_len);
 959        struct ceph_msg *m = NULL;
 960
 961        *skip = 0;
 962
 963        switch (type) {
 964        case CEPH_MSG_MON_SUBSCRIBE_ACK:
 965                m = ceph_msg_get(monc->m_subscribe_ack);
 966                break;
 967        case CEPH_MSG_POOLOP_REPLY:
 968        case CEPH_MSG_STATFS_REPLY:
 969                return get_generic_reply(con, hdr, skip);
 970        case CEPH_MSG_AUTH_REPLY:
 971                m = ceph_msg_get(monc->m_auth_reply);
 972                break;
 973        case CEPH_MSG_MON_MAP:
 974        case CEPH_MSG_MDS_MAP:
 975        case CEPH_MSG_OSD_MAP:
 976                m = ceph_msg_new(type, front_len, GFP_NOFS);
 977                break;
 978        }
 979
 980        if (!m) {
 981                pr_info("alloc_msg unknown type %d\n", type);
 982                *skip = 1;
 983        }
 984        return m;
 985}
 986
 987/*
 988 * If the monitor connection resets, pick a new monitor and resubmit
 989 * any pending requests.
 990 */
 991static void mon_fault(struct ceph_connection *con)
 992{
 993        struct ceph_mon_client *monc = con->private;
 994
 995        if (!monc)
 996                return;
 997
 998        dout("mon_fault\n");
 999        mutex_lock(&monc->mutex);
1000        if (!con->private)
1001                goto out;
1002
1003        if (monc->con && !monc->hunting)
1004                pr_info("mon%d %s session lost, "
1005                        "hunting for new mon\n", monc->cur_mon,
1006                        ceph_pr_addr(&monc->con->peer_addr.in_addr));
1007
1008        __close_session(monc);
1009        if (!monc->hunting) {
1010                /* start hunting */
1011                monc->hunting = true;
1012                __open_session(monc);
1013        } else {
1014                /* already hunting, let's wait a bit */
1015                __schedule_delayed(monc);
1016        }
1017out:
1018        mutex_unlock(&monc->mutex);
1019}
1020
1021static const struct ceph_connection_operations mon_con_ops = {
1022        .get = ceph_con_get,
1023        .put = ceph_con_put,
1024        .dispatch = dispatch,
1025        .fault = mon_fault,
1026        .alloc_msg = mon_alloc_msg,
1027};
1028