linux/net/ceph/osdmap.c
<<
>>
Prefs
   1
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/module.h>
   5#include <linux/slab.h>
   6#include <asm/div64.h>
   7
   8#include <linux/ceph/libceph.h>
   9#include <linux/ceph/osdmap.h>
  10#include <linux/ceph/decode.h>
  11#include <linux/crush/hash.h>
  12#include <linux/crush/mapper.h>
  13
  14char *ceph_osdmap_state_str(char *str, int len, int state)
  15{
  16        if (!len)
  17                return str;
  18
  19        if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
  20                snprintf(str, len, "exists, up");
  21        else if (state & CEPH_OSD_EXISTS)
  22                snprintf(str, len, "exists");
  23        else if (state & CEPH_OSD_UP)
  24                snprintf(str, len, "up");
  25        else
  26                snprintf(str, len, "doesn't exist");
  27
  28        return str;
  29}
  30
  31/* maps */
  32
  33static int calc_bits_of(unsigned int t)
  34{
  35        int b = 0;
  36        while (t) {
  37                t = t >> 1;
  38                b++;
  39        }
  40        return b;
  41}
  42
  43/*
  44 * the foo_mask is the smallest value 2^n-1 that is >= foo.
  45 */
  46static void calc_pg_masks(struct ceph_pg_pool_info *pi)
  47{
  48        pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
  49        pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
  50}
  51
  52/*
  53 * decode crush map
  54 */
  55static int crush_decode_uniform_bucket(void **p, void *end,
  56                                       struct crush_bucket_uniform *b)
  57{
  58        dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
  59        ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
  60        b->item_weight = ceph_decode_32(p);
  61        return 0;
  62bad:
  63        return -EINVAL;
  64}
  65
  66static int crush_decode_list_bucket(void **p, void *end,
  67                                    struct crush_bucket_list *b)
  68{
  69        int j;
  70        dout("crush_decode_list_bucket %p to %p\n", *p, end);
  71        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  72        if (b->item_weights == NULL)
  73                return -ENOMEM;
  74        b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  75        if (b->sum_weights == NULL)
  76                return -ENOMEM;
  77        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
  78        for (j = 0; j < b->h.size; j++) {
  79                b->item_weights[j] = ceph_decode_32(p);
  80                b->sum_weights[j] = ceph_decode_32(p);
  81        }
  82        return 0;
  83bad:
  84        return -EINVAL;
  85}
  86
  87static int crush_decode_tree_bucket(void **p, void *end,
  88                                    struct crush_bucket_tree *b)
  89{
  90        int j;
  91        dout("crush_decode_tree_bucket %p to %p\n", *p, end);
  92        ceph_decode_8_safe(p, end, b->num_nodes, bad);
  93        b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
  94        if (b->node_weights == NULL)
  95                return -ENOMEM;
  96        ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
  97        for (j = 0; j < b->num_nodes; j++)
  98                b->node_weights[j] = ceph_decode_32(p);
  99        return 0;
 100bad:
 101        return -EINVAL;
 102}
 103
 104static int crush_decode_straw_bucket(void **p, void *end,
 105                                     struct crush_bucket_straw *b)
 106{
 107        int j;
 108        dout("crush_decode_straw_bucket %p to %p\n", *p, end);
 109        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 110        if (b->item_weights == NULL)
 111                return -ENOMEM;
 112        b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 113        if (b->straws == NULL)
 114                return -ENOMEM;
 115        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
 116        for (j = 0; j < b->h.size; j++) {
 117                b->item_weights[j] = ceph_decode_32(p);
 118                b->straws[j] = ceph_decode_32(p);
 119        }
 120        return 0;
 121bad:
 122        return -EINVAL;
 123}
 124
 125static int crush_decode_straw2_bucket(void **p, void *end,
 126                                      struct crush_bucket_straw2 *b)
 127{
 128        int j;
 129        dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
 130        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 131        if (b->item_weights == NULL)
 132                return -ENOMEM;
 133        ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
 134        for (j = 0; j < b->h.size; j++)
 135                b->item_weights[j] = ceph_decode_32(p);
 136        return 0;
 137bad:
 138        return -EINVAL;
 139}
 140
 141static int skip_name_map(void **p, void *end)
 142{
 143        int len;
 144        ceph_decode_32_safe(p, end, len ,bad);
 145        while (len--) {
 146                int strlen;
 147                *p += sizeof(u32);
 148                ceph_decode_32_safe(p, end, strlen, bad);
 149                *p += strlen;
 150}
 151        return 0;
 152bad:
 153        return -EINVAL;
 154}
 155
 156static struct crush_map *crush_decode(void *pbyval, void *end)
 157{
 158        struct crush_map *c;
 159        int err = -EINVAL;
 160        int i, j;
 161        void **p = &pbyval;
 162        void *start = pbyval;
 163        u32 magic;
 164        u32 num_name_maps;
 165
 166        dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
 167
 168        c = kzalloc(sizeof(*c), GFP_NOFS);
 169        if (c == NULL)
 170                return ERR_PTR(-ENOMEM);
 171
 172        /* set tunables to default values */
 173        c->choose_local_tries = 2;
 174        c->choose_local_fallback_tries = 5;
 175        c->choose_total_tries = 19;
 176        c->chooseleaf_descend_once = 0;
 177
 178        ceph_decode_need(p, end, 4*sizeof(u32), bad);
 179        magic = ceph_decode_32(p);
 180        if (magic != CRUSH_MAGIC) {
 181                pr_err("crush_decode magic %x != current %x\n",
 182                       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
 183                goto bad;
 184        }
 185        c->max_buckets = ceph_decode_32(p);
 186        c->max_rules = ceph_decode_32(p);
 187        c->max_devices = ceph_decode_32(p);
 188
 189        c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
 190        if (c->buckets == NULL)
 191                goto badmem;
 192        c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
 193        if (c->rules == NULL)
 194                goto badmem;
 195
 196        /* buckets */
 197        for (i = 0; i < c->max_buckets; i++) {
 198                int size = 0;
 199                u32 alg;
 200                struct crush_bucket *b;
 201
 202                ceph_decode_32_safe(p, end, alg, bad);
 203                if (alg == 0) {
 204                        c->buckets[i] = NULL;
 205                        continue;
 206                }
 207                dout("crush_decode bucket %d off %x %p to %p\n",
 208                     i, (int)(*p-start), *p, end);
 209
 210                switch (alg) {
 211                case CRUSH_BUCKET_UNIFORM:
 212                        size = sizeof(struct crush_bucket_uniform);
 213                        break;
 214                case CRUSH_BUCKET_LIST:
 215                        size = sizeof(struct crush_bucket_list);
 216                        break;
 217                case CRUSH_BUCKET_TREE:
 218                        size = sizeof(struct crush_bucket_tree);
 219                        break;
 220                case CRUSH_BUCKET_STRAW:
 221                        size = sizeof(struct crush_bucket_straw);
 222                        break;
 223                case CRUSH_BUCKET_STRAW2:
 224                        size = sizeof(struct crush_bucket_straw2);
 225                        break;
 226                default:
 227                        err = -EINVAL;
 228                        goto bad;
 229                }
 230                BUG_ON(size == 0);
 231                b = c->buckets[i] = kzalloc(size, GFP_NOFS);
 232                if (b == NULL)
 233                        goto badmem;
 234
 235                ceph_decode_need(p, end, 4*sizeof(u32), bad);
 236                b->id = ceph_decode_32(p);
 237                b->type = ceph_decode_16(p);
 238                b->alg = ceph_decode_8(p);
 239                b->hash = ceph_decode_8(p);
 240                b->weight = ceph_decode_32(p);
 241                b->size = ceph_decode_32(p);
 242
 243                dout("crush_decode bucket size %d off %x %p to %p\n",
 244                     b->size, (int)(*p-start), *p, end);
 245
 246                b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
 247                if (b->items == NULL)
 248                        goto badmem;
 249                b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS);
 250                if (b->perm == NULL)
 251                        goto badmem;
 252                b->perm_n = 0;
 253
 254                ceph_decode_need(p, end, b->size*sizeof(u32), bad);
 255                for (j = 0; j < b->size; j++)
 256                        b->items[j] = ceph_decode_32(p);
 257
 258                switch (b->alg) {
 259                case CRUSH_BUCKET_UNIFORM:
 260                        err = crush_decode_uniform_bucket(p, end,
 261                                  (struct crush_bucket_uniform *)b);
 262                        if (err < 0)
 263                                goto bad;
 264                        break;
 265                case CRUSH_BUCKET_LIST:
 266                        err = crush_decode_list_bucket(p, end,
 267                               (struct crush_bucket_list *)b);
 268                        if (err < 0)
 269                                goto bad;
 270                        break;
 271                case CRUSH_BUCKET_TREE:
 272                        err = crush_decode_tree_bucket(p, end,
 273                                (struct crush_bucket_tree *)b);
 274                        if (err < 0)
 275                                goto bad;
 276                        break;
 277                case CRUSH_BUCKET_STRAW:
 278                        err = crush_decode_straw_bucket(p, end,
 279                                (struct crush_bucket_straw *)b);
 280                        if (err < 0)
 281                                goto bad;
 282                        break;
 283                case CRUSH_BUCKET_STRAW2:
 284                        err = crush_decode_straw2_bucket(p, end,
 285                                (struct crush_bucket_straw2 *)b);
 286                        if (err < 0)
 287                                goto bad;
 288                        break;
 289                }
 290        }
 291
 292        /* rules */
 293        dout("rule vec is %p\n", c->rules);
 294        for (i = 0; i < c->max_rules; i++) {
 295                u32 yes;
 296                struct crush_rule *r;
 297
 298                ceph_decode_32_safe(p, end, yes, bad);
 299                if (!yes) {
 300                        dout("crush_decode NO rule %d off %x %p to %p\n",
 301                             i, (int)(*p-start), *p, end);
 302                        c->rules[i] = NULL;
 303                        continue;
 304                }
 305
 306                dout("crush_decode rule %d off %x %p to %p\n",
 307                     i, (int)(*p-start), *p, end);
 308
 309                /* len */
 310                ceph_decode_32_safe(p, end, yes, bad);
 311#if BITS_PER_LONG == 32
 312                err = -EINVAL;
 313                if (yes > (ULONG_MAX - sizeof(*r))
 314                          / sizeof(struct crush_rule_step))
 315                        goto bad;
 316#endif
 317                r = c->rules[i] = kmalloc(sizeof(*r) +
 318                                          yes*sizeof(struct crush_rule_step),
 319                                          GFP_NOFS);
 320                if (r == NULL)
 321                        goto badmem;
 322                dout(" rule %d is at %p\n", i, r);
 323                r->len = yes;
 324                ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
 325                ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
 326                for (j = 0; j < r->len; j++) {
 327                        r->steps[j].op = ceph_decode_32(p);
 328                        r->steps[j].arg1 = ceph_decode_32(p);
 329                        r->steps[j].arg2 = ceph_decode_32(p);
 330                }
 331        }
 332
 333        /* ignore trailing name maps. */
 334        for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
 335                err = skip_name_map(p, end);
 336                if (err < 0)
 337                        goto done;
 338        }
 339
 340        /* tunables */
 341        ceph_decode_need(p, end, 3*sizeof(u32), done);
 342        c->choose_local_tries = ceph_decode_32(p);
 343        c->choose_local_fallback_tries =  ceph_decode_32(p);
 344        c->choose_total_tries = ceph_decode_32(p);
 345        dout("crush decode tunable choose_local_tries = %d\n",
 346             c->choose_local_tries);
 347        dout("crush decode tunable choose_local_fallback_tries = %d\n",
 348             c->choose_local_fallback_tries);
 349        dout("crush decode tunable choose_total_tries = %d\n",
 350             c->choose_total_tries);
 351
 352        ceph_decode_need(p, end, sizeof(u32), done);
 353        c->chooseleaf_descend_once = ceph_decode_32(p);
 354        dout("crush decode tunable chooseleaf_descend_once = %d\n",
 355             c->chooseleaf_descend_once);
 356
 357        ceph_decode_need(p, end, sizeof(u8), done);
 358        c->chooseleaf_vary_r = ceph_decode_8(p);
 359        dout("crush decode tunable chooseleaf_vary_r = %d\n",
 360             c->chooseleaf_vary_r);
 361
 362        /* skip straw_calc_version, allowed_bucket_algs */
 363        ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
 364        *p += sizeof(u8) + sizeof(u32);
 365
 366        ceph_decode_need(p, end, sizeof(u8), done);
 367        c->chooseleaf_stable = ceph_decode_8(p);
 368        dout("crush decode tunable chooseleaf_stable = %d\n",
 369             c->chooseleaf_stable);
 370
 371done:
 372        dout("crush_decode success\n");
 373        return c;
 374
 375badmem:
 376        err = -ENOMEM;
 377bad:
 378        dout("crush_decode fail %d\n", err);
 379        crush_destroy(c);
 380        return ERR_PTR(err);
 381}
 382
 383int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
 384{
 385        if (lhs->pool < rhs->pool)
 386                return -1;
 387        if (lhs->pool > rhs->pool)
 388                return 1;
 389        if (lhs->seed < rhs->seed)
 390                return -1;
 391        if (lhs->seed > rhs->seed)
 392                return 1;
 393
 394        return 0;
 395}
 396
 397/*
 398 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
 399 * to a set of osds) and primary_temp (explicit primary setting)
 400 */
 401static int __insert_pg_mapping(struct ceph_pg_mapping *new,
 402                               struct rb_root *root)
 403{
 404        struct rb_node **p = &root->rb_node;
 405        struct rb_node *parent = NULL;
 406        struct ceph_pg_mapping *pg = NULL;
 407        int c;
 408
 409        dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new);
 410        while (*p) {
 411                parent = *p;
 412                pg = rb_entry(parent, struct ceph_pg_mapping, node);
 413                c = ceph_pg_compare(&new->pgid, &pg->pgid);
 414                if (c < 0)
 415                        p = &(*p)->rb_left;
 416                else if (c > 0)
 417                        p = &(*p)->rb_right;
 418                else
 419                        return -EEXIST;
 420        }
 421
 422        rb_link_node(&new->node, parent, p);
 423        rb_insert_color(&new->node, root);
 424        return 0;
 425}
 426
 427static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
 428                                                   struct ceph_pg pgid)
 429{
 430        struct rb_node *n = root->rb_node;
 431        struct ceph_pg_mapping *pg;
 432        int c;
 433
 434        while (n) {
 435                pg = rb_entry(n, struct ceph_pg_mapping, node);
 436                c = ceph_pg_compare(&pgid, &pg->pgid);
 437                if (c < 0) {
 438                        n = n->rb_left;
 439                } else if (c > 0) {
 440                        n = n->rb_right;
 441                } else {
 442                        dout("__lookup_pg_mapping %lld.%x got %p\n",
 443                             pgid.pool, pgid.seed, pg);
 444                        return pg;
 445                }
 446        }
 447        return NULL;
 448}
 449
 450static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
 451{
 452        struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
 453
 454        if (pg) {
 455                dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
 456                     pg);
 457                rb_erase(&pg->node, root);
 458                kfree(pg);
 459                return 0;
 460        }
 461        dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
 462        return -ENOENT;
 463}
 464
 465/*
 466 * rbtree of pg pool info
 467 */
 468static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
 469{
 470        struct rb_node **p = &root->rb_node;
 471        struct rb_node *parent = NULL;
 472        struct ceph_pg_pool_info *pi = NULL;
 473
 474        while (*p) {
 475                parent = *p;
 476                pi = rb_entry(parent, struct ceph_pg_pool_info, node);
 477                if (new->id < pi->id)
 478                        p = &(*p)->rb_left;
 479                else if (new->id > pi->id)
 480                        p = &(*p)->rb_right;
 481                else
 482                        return -EEXIST;
 483        }
 484
 485        rb_link_node(&new->node, parent, p);
 486        rb_insert_color(&new->node, root);
 487        return 0;
 488}
 489
 490static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
 491{
 492        struct ceph_pg_pool_info *pi;
 493        struct rb_node *n = root->rb_node;
 494
 495        while (n) {
 496                pi = rb_entry(n, struct ceph_pg_pool_info, node);
 497                if (id < pi->id)
 498                        n = n->rb_left;
 499                else if (id > pi->id)
 500                        n = n->rb_right;
 501                else
 502                        return pi;
 503        }
 504        return NULL;
 505}
 506
 507struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 508{
 509        return __lookup_pg_pool(&map->pg_pools, id);
 510}
 511
 512const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
 513{
 514        struct ceph_pg_pool_info *pi;
 515
 516        if (id == CEPH_NOPOOL)
 517                return NULL;
 518
 519        if (WARN_ON_ONCE(id > (u64) INT_MAX))
 520                return NULL;
 521
 522        pi = __lookup_pg_pool(&map->pg_pools, (int) id);
 523
 524        return pi ? pi->name : NULL;
 525}
 526EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
 527
 528int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 529{
 530        struct rb_node *rbp;
 531
 532        for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
 533                struct ceph_pg_pool_info *pi =
 534                        rb_entry(rbp, struct ceph_pg_pool_info, node);
 535                if (pi->name && strcmp(pi->name, name) == 0)
 536                        return pi->id;
 537        }
 538        return -ENOENT;
 539}
 540EXPORT_SYMBOL(ceph_pg_poolid_by_name);
 541
 542static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 543{
 544        rb_erase(&pi->node, root);
 545        kfree(pi->name);
 546        kfree(pi);
 547}
 548
 549static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 550{
 551        u8 ev, cv;
 552        unsigned len, num;
 553        void *pool_end;
 554
 555        ceph_decode_need(p, end, 2 + 4, bad);
 556        ev = ceph_decode_8(p);  /* encoding version */
 557        cv = ceph_decode_8(p); /* compat version */
 558        if (ev < 5) {
 559                pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
 560                return -EINVAL;
 561        }
 562        if (cv > 9) {
 563                pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
 564                return -EINVAL;
 565        }
 566        len = ceph_decode_32(p);
 567        ceph_decode_need(p, end, len, bad);
 568        pool_end = *p + len;
 569
 570        pi->type = ceph_decode_8(p);
 571        pi->size = ceph_decode_8(p);
 572        pi->crush_ruleset = ceph_decode_8(p);
 573        pi->object_hash = ceph_decode_8(p);
 574
 575        pi->pg_num = ceph_decode_32(p);
 576        pi->pgp_num = ceph_decode_32(p);
 577
 578        *p += 4 + 4;  /* skip lpg* */
 579        *p += 4;      /* skip last_change */
 580        *p += 8 + 4;  /* skip snap_seq, snap_epoch */
 581
 582        /* skip snaps */
 583        num = ceph_decode_32(p);
 584        while (num--) {
 585                *p += 8;  /* snapid key */
 586                *p += 1 + 1; /* versions */
 587                len = ceph_decode_32(p);
 588                *p += len;
 589        }
 590
 591        /* skip removed_snaps */
 592        num = ceph_decode_32(p);
 593        *p += num * (8 + 8);
 594
 595        *p += 8;  /* skip auid */
 596        pi->flags = ceph_decode_64(p);
 597        *p += 4;  /* skip crash_replay_interval */
 598
 599        if (ev >= 7)
 600                pi->min_size = ceph_decode_8(p);
 601        else
 602                pi->min_size = pi->size - pi->size / 2;
 603
 604        if (ev >= 8)
 605                *p += 8 + 8;  /* skip quota_max_* */
 606
 607        if (ev >= 9) {
 608                /* skip tiers */
 609                num = ceph_decode_32(p);
 610                *p += num * 8;
 611
 612                *p += 8;  /* skip tier_of */
 613                *p += 1;  /* skip cache_mode */
 614
 615                pi->read_tier = ceph_decode_64(p);
 616                pi->write_tier = ceph_decode_64(p);
 617        } else {
 618                pi->read_tier = -1;
 619                pi->write_tier = -1;
 620        }
 621
 622        if (ev >= 10) {
 623                /* skip properties */
 624                num = ceph_decode_32(p);
 625                while (num--) {
 626                        len = ceph_decode_32(p);
 627                        *p += len; /* key */
 628                        len = ceph_decode_32(p);
 629                        *p += len; /* val */
 630                }
 631        }
 632
 633        if (ev >= 11) {
 634                /* skip hit_set_params */
 635                *p += 1 + 1; /* versions */
 636                len = ceph_decode_32(p);
 637                *p += len;
 638
 639                *p += 4; /* skip hit_set_period */
 640                *p += 4; /* skip hit_set_count */
 641        }
 642
 643        if (ev >= 12)
 644                *p += 4; /* skip stripe_width */
 645
 646        if (ev >= 13) {
 647                *p += 8; /* skip target_max_bytes */
 648                *p += 8; /* skip target_max_objects */
 649                *p += 4; /* skip cache_target_dirty_ratio_micro */
 650                *p += 4; /* skip cache_target_full_ratio_micro */
 651                *p += 4; /* skip cache_min_flush_age */
 652                *p += 4; /* skip cache_min_evict_age */
 653        }
 654
 655        if (ev >=  14) {
 656                /* skip erasure_code_profile */
 657                len = ceph_decode_32(p);
 658                *p += len;
 659        }
 660
 661        if (ev >= 15)
 662                pi->last_force_request_resend = ceph_decode_32(p);
 663        else
 664                pi->last_force_request_resend = 0;
 665
 666        /* ignore the rest */
 667
 668        *p = pool_end;
 669        calc_pg_masks(pi);
 670        return 0;
 671
 672bad:
 673        return -EINVAL;
 674}
 675
 676static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 677{
 678        struct ceph_pg_pool_info *pi;
 679        u32 num, len;
 680        u64 pool;
 681
 682        ceph_decode_32_safe(p, end, num, bad);
 683        dout(" %d pool names\n", num);
 684        while (num--) {
 685                ceph_decode_64_safe(p, end, pool, bad);
 686                ceph_decode_32_safe(p, end, len, bad);
 687                dout("  pool %llu len %d\n", pool, len);
 688                ceph_decode_need(p, end, len, bad);
 689                pi = __lookup_pg_pool(&map->pg_pools, pool);
 690                if (pi) {
 691                        char *name = kstrndup(*p, len, GFP_NOFS);
 692
 693                        if (!name)
 694                                return -ENOMEM;
 695                        kfree(pi->name);
 696                        pi->name = name;
 697                        dout("  name is %s\n", pi->name);
 698                }
 699                *p += len;
 700        }
 701        return 0;
 702
 703bad:
 704        return -EINVAL;
 705}
 706
 707/*
 708 * osd map
 709 */
 710struct ceph_osdmap *ceph_osdmap_alloc(void)
 711{
 712        struct ceph_osdmap *map;
 713
 714        map = kzalloc(sizeof(*map), GFP_NOIO);
 715        if (!map)
 716                return NULL;
 717
 718        map->pg_pools = RB_ROOT;
 719        map->pool_max = -1;
 720        map->pg_temp = RB_ROOT;
 721        map->primary_temp = RB_ROOT;
 722        mutex_init(&map->crush_scratch_mutex);
 723
 724        return map;
 725}
 726
 727void ceph_osdmap_destroy(struct ceph_osdmap *map)
 728{
 729        dout("osdmap_destroy %p\n", map);
 730        if (map->crush)
 731                crush_destroy(map->crush);
 732        while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 733                struct ceph_pg_mapping *pg =
 734                        rb_entry(rb_first(&map->pg_temp),
 735                                 struct ceph_pg_mapping, node);
 736                rb_erase(&pg->node, &map->pg_temp);
 737                kfree(pg);
 738        }
 739        while (!RB_EMPTY_ROOT(&map->primary_temp)) {
 740                struct ceph_pg_mapping *pg =
 741                        rb_entry(rb_first(&map->primary_temp),
 742                                 struct ceph_pg_mapping, node);
 743                rb_erase(&pg->node, &map->primary_temp);
 744                kfree(pg);
 745        }
 746        while (!RB_EMPTY_ROOT(&map->pg_pools)) {
 747                struct ceph_pg_pool_info *pi =
 748                        rb_entry(rb_first(&map->pg_pools),
 749                                 struct ceph_pg_pool_info, node);
 750                __remove_pg_pool(&map->pg_pools, pi);
 751        }
 752        kfree(map->osd_state);
 753        kfree(map->osd_weight);
 754        kfree(map->osd_addr);
 755        kfree(map->osd_primary_affinity);
 756        kfree(map);
 757}
 758
 759/*
 760 * Adjust max_osd value, (re)allocate arrays.
 761 *
 762 * The new elements are properly initialized.
 763 */
 764static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 765{
 766        u8 *state;
 767        u32 *weight;
 768        struct ceph_entity_addr *addr;
 769        int i;
 770
 771        state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
 772        if (!state)
 773                return -ENOMEM;
 774        map->osd_state = state;
 775
 776        weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
 777        if (!weight)
 778                return -ENOMEM;
 779        map->osd_weight = weight;
 780
 781        addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
 782        if (!addr)
 783                return -ENOMEM;
 784        map->osd_addr = addr;
 785
 786        for (i = map->max_osd; i < max; i++) {
 787                map->osd_state[i] = 0;
 788                map->osd_weight[i] = CEPH_OSD_OUT;
 789                memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
 790        }
 791
 792        if (map->osd_primary_affinity) {
 793                u32 *affinity;
 794
 795                affinity = krealloc(map->osd_primary_affinity,
 796                                    max*sizeof(*affinity), GFP_NOFS);
 797                if (!affinity)
 798                        return -ENOMEM;
 799                map->osd_primary_affinity = affinity;
 800
 801                for (i = map->max_osd; i < max; i++)
 802                        map->osd_primary_affinity[i] =
 803                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
 804        }
 805
 806        map->max_osd = max;
 807
 808        return 0;
 809}
 810
 811#define OSDMAP_WRAPPER_COMPAT_VER       7
 812#define OSDMAP_CLIENT_DATA_COMPAT_VER   1
 813
 814/*
 815 * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
 816 * to struct_v of the client_data section for new (v7 and above)
 817 * osdmaps.
 818 */
 819static int get_osdmap_client_data_v(void **p, void *end,
 820                                    const char *prefix, u8 *v)
 821{
 822        u8 struct_v;
 823
 824        ceph_decode_8_safe(p, end, struct_v, e_inval);
 825        if (struct_v >= 7) {
 826                u8 struct_compat;
 827
 828                ceph_decode_8_safe(p, end, struct_compat, e_inval);
 829                if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
 830                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
 831                                struct_v, struct_compat,
 832                                OSDMAP_WRAPPER_COMPAT_VER, prefix);
 833                        return -EINVAL;
 834                }
 835                *p += 4; /* ignore wrapper struct_len */
 836
 837                ceph_decode_8_safe(p, end, struct_v, e_inval);
 838                ceph_decode_8_safe(p, end, struct_compat, e_inval);
 839                if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
 840                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
 841                                struct_v, struct_compat,
 842                                OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
 843                        return -EINVAL;
 844                }
 845                *p += 4; /* ignore client data struct_len */
 846        } else {
 847                u16 version;
 848
 849                *p -= 1;
 850                ceph_decode_16_safe(p, end, version, e_inval);
 851                if (version < 6) {
 852                        pr_warn("got v %d < 6 of %s ceph_osdmap\n",
 853                                version, prefix);
 854                        return -EINVAL;
 855                }
 856
 857                /* old osdmap enconding */
 858                struct_v = 0;
 859        }
 860
 861        *v = struct_v;
 862        return 0;
 863
 864e_inval:
 865        return -EINVAL;
 866}
 867
 868static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
 869                          bool incremental)
 870{
 871        u32 n;
 872
 873        ceph_decode_32_safe(p, end, n, e_inval);
 874        while (n--) {
 875                struct ceph_pg_pool_info *pi;
 876                u64 pool;
 877                int ret;
 878
 879                ceph_decode_64_safe(p, end, pool, e_inval);
 880
 881                pi = __lookup_pg_pool(&map->pg_pools, pool);
 882                if (!incremental || !pi) {
 883                        pi = kzalloc(sizeof(*pi), GFP_NOFS);
 884                        if (!pi)
 885                                return -ENOMEM;
 886
 887                        pi->id = pool;
 888
 889                        ret = __insert_pg_pool(&map->pg_pools, pi);
 890                        if (ret) {
 891                                kfree(pi);
 892                                return ret;
 893                        }
 894                }
 895
 896                ret = decode_pool(p, end, pi);
 897                if (ret)
 898                        return ret;
 899        }
 900
 901        return 0;
 902
 903e_inval:
 904        return -EINVAL;
 905}
 906
 907static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
 908{
 909        return __decode_pools(p, end, map, false);
 910}
 911
 912static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
 913{
 914        return __decode_pools(p, end, map, true);
 915}
 916
 917static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
 918                            bool incremental)
 919{
 920        u32 n;
 921
 922        ceph_decode_32_safe(p, end, n, e_inval);
 923        while (n--) {
 924                struct ceph_pg pgid;
 925                u32 len, i;
 926                int ret;
 927
 928                ret = ceph_decode_pgid(p, end, &pgid);
 929                if (ret)
 930                        return ret;
 931
 932                ceph_decode_32_safe(p, end, len, e_inval);
 933
 934                ret = __remove_pg_mapping(&map->pg_temp, pgid);
 935                BUG_ON(!incremental && ret != -ENOENT);
 936
 937                if (!incremental || len > 0) {
 938                        struct ceph_pg_mapping *pg;
 939
 940                        ceph_decode_need(p, end, len*sizeof(u32), e_inval);
 941
 942                        if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
 943                                return -EINVAL;
 944
 945                        pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
 946                        if (!pg)
 947                                return -ENOMEM;
 948
 949                        pg->pgid = pgid;
 950                        pg->pg_temp.len = len;
 951                        for (i = 0; i < len; i++)
 952                                pg->pg_temp.osds[i] = ceph_decode_32(p);
 953
 954                        ret = __insert_pg_mapping(pg, &map->pg_temp);
 955                        if (ret) {
 956                                kfree(pg);
 957                                return ret;
 958                        }
 959                }
 960        }
 961
 962        return 0;
 963
 964e_inval:
 965        return -EINVAL;
 966}
 967
 968static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
 969{
 970        return __decode_pg_temp(p, end, map, false);
 971}
 972
 973static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
 974{
 975        return __decode_pg_temp(p, end, map, true);
 976}
 977
 978static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
 979                                 bool incremental)
 980{
 981        u32 n;
 982
 983        ceph_decode_32_safe(p, end, n, e_inval);
 984        while (n--) {
 985                struct ceph_pg pgid;
 986                u32 osd;
 987                int ret;
 988
 989                ret = ceph_decode_pgid(p, end, &pgid);
 990                if (ret)
 991                        return ret;
 992
 993                ceph_decode_32_safe(p, end, osd, e_inval);
 994
 995                ret = __remove_pg_mapping(&map->primary_temp, pgid);
 996                BUG_ON(!incremental && ret != -ENOENT);
 997
 998                if (!incremental || osd != (u32)-1) {
 999                        struct ceph_pg_mapping *pg;
1000
1001                        pg = kzalloc(sizeof(*pg), GFP_NOFS);
1002                        if (!pg)
1003                                return -ENOMEM;
1004
1005                        pg->pgid = pgid;
1006                        pg->primary_temp.osd = osd;
1007
1008                        ret = __insert_pg_mapping(pg, &map->primary_temp);
1009                        if (ret) {
1010                                kfree(pg);
1011                                return ret;
1012                        }
1013                }
1014        }
1015
1016        return 0;
1017
1018e_inval:
1019        return -EINVAL;
1020}
1021
1022static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1023{
1024        return __decode_primary_temp(p, end, map, false);
1025}
1026
1027static int decode_new_primary_temp(void **p, void *end,
1028                                   struct ceph_osdmap *map)
1029{
1030        return __decode_primary_temp(p, end, map, true);
1031}
1032
1033u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1034{
1035        BUG_ON(osd >= map->max_osd);
1036
1037        if (!map->osd_primary_affinity)
1038                return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1039
1040        return map->osd_primary_affinity[osd];
1041}
1042
1043static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1044{
1045        BUG_ON(osd >= map->max_osd);
1046
1047        if (!map->osd_primary_affinity) {
1048                int i;
1049
1050                map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32),
1051                                                    GFP_NOFS);
1052                if (!map->osd_primary_affinity)
1053                        return -ENOMEM;
1054
1055                for (i = 0; i < map->max_osd; i++)
1056                        map->osd_primary_affinity[i] =
1057                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1058        }
1059
1060        map->osd_primary_affinity[osd] = aff;
1061
1062        return 0;
1063}
1064
1065static int decode_primary_affinity(void **p, void *end,
1066                                   struct ceph_osdmap *map)
1067{
1068        u32 len, i;
1069
1070        ceph_decode_32_safe(p, end, len, e_inval);
1071        if (len == 0) {
1072                kfree(map->osd_primary_affinity);
1073                map->osd_primary_affinity = NULL;
1074                return 0;
1075        }
1076        if (len != map->max_osd)
1077                goto e_inval;
1078
1079        ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1080
1081        for (i = 0; i < map->max_osd; i++) {
1082                int ret;
1083
1084                ret = set_primary_affinity(map, i, ceph_decode_32(p));
1085                if (ret)
1086                        return ret;
1087        }
1088
1089        return 0;
1090
1091e_inval:
1092        return -EINVAL;
1093}
1094
1095static int decode_new_primary_affinity(void **p, void *end,
1096                                       struct ceph_osdmap *map)
1097{
1098        u32 n;
1099
1100        ceph_decode_32_safe(p, end, n, e_inval);
1101        while (n--) {
1102                u32 osd, aff;
1103                int ret;
1104
1105                ceph_decode_32_safe(p, end, osd, e_inval);
1106                ceph_decode_32_safe(p, end, aff, e_inval);
1107
1108                ret = set_primary_affinity(map, osd, aff);
1109                if (ret)
1110                        return ret;
1111
1112                pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1113        }
1114
1115        return 0;
1116
1117e_inval:
1118        return -EINVAL;
1119}
1120
1121/*
1122 * decode a full map.
1123 */
1124static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1125{
1126        u8 struct_v;
1127        u32 epoch = 0;
1128        void *start = *p;
1129        u32 max;
1130        u32 len, i;
1131        int err;
1132
1133        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1134
1135        err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1136        if (err)
1137                goto bad;
1138
1139        /* fsid, epoch, created, modified */
1140        ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1141                         sizeof(map->created) + sizeof(map->modified), e_inval);
1142        ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1143        epoch = map->epoch = ceph_decode_32(p);
1144        ceph_decode_copy(p, &map->created, sizeof(map->created));
1145        ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1146
1147        /* pools */
1148        err = decode_pools(p, end, map);
1149        if (err)
1150                goto bad;
1151
1152        /* pool_name */
1153        err = decode_pool_names(p, end, map);
1154        if (err)
1155                goto bad;
1156
1157        ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1158
1159        ceph_decode_32_safe(p, end, map->flags, e_inval);
1160
1161        /* max_osd */
1162        ceph_decode_32_safe(p, end, max, e_inval);
1163
1164        /* (re)alloc osd arrays */
1165        err = osdmap_set_max_osd(map, max);
1166        if (err)
1167                goto bad;
1168
1169        /* osd_state, osd_weight, osd_addrs->client_addr */
1170        ceph_decode_need(p, end, 3*sizeof(u32) +
1171                         map->max_osd*(1 + sizeof(*map->osd_weight) +
1172                                       sizeof(*map->osd_addr)), e_inval);
1173
1174        if (ceph_decode_32(p) != map->max_osd)
1175                goto e_inval;
1176
1177        ceph_decode_copy(p, map->osd_state, map->max_osd);
1178
1179        if (ceph_decode_32(p) != map->max_osd)
1180                goto e_inval;
1181
1182        for (i = 0; i < map->max_osd; i++)
1183                map->osd_weight[i] = ceph_decode_32(p);
1184
1185        if (ceph_decode_32(p) != map->max_osd)
1186                goto e_inval;
1187
1188        ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
1189        for (i = 0; i < map->max_osd; i++)
1190                ceph_decode_addr(&map->osd_addr[i]);
1191
1192        /* pg_temp */
1193        err = decode_pg_temp(p, end, map);
1194        if (err)
1195                goto bad;
1196
1197        /* primary_temp */
1198        if (struct_v >= 1) {
1199                err = decode_primary_temp(p, end, map);
1200                if (err)
1201                        goto bad;
1202        }
1203
1204        /* primary_affinity */
1205        if (struct_v >= 2) {
1206                err = decode_primary_affinity(p, end, map);
1207                if (err)
1208                        goto bad;
1209        } else {
1210                /* XXX can this happen? */
1211                kfree(map->osd_primary_affinity);
1212                map->osd_primary_affinity = NULL;
1213        }
1214
1215        /* crush */
1216        ceph_decode_32_safe(p, end, len, e_inval);
1217        map->crush = crush_decode(*p, min(*p + len, end));
1218        if (IS_ERR(map->crush)) {
1219                err = PTR_ERR(map->crush);
1220                map->crush = NULL;
1221                goto bad;
1222        }
1223        *p += len;
1224
1225        /* ignore the rest */
1226        *p = end;
1227
1228        dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1229        return 0;
1230
1231e_inval:
1232        err = -EINVAL;
1233bad:
1234        pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1235               err, epoch, (int)(*p - start), *p, start, end);
1236        print_hex_dump(KERN_DEBUG, "osdmap: ",
1237                       DUMP_PREFIX_OFFSET, 16, 1,
1238                       start, end - start, true);
1239        return err;
1240}
1241
1242/*
1243 * Allocate and decode a full map.
1244 */
1245struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1246{
1247        struct ceph_osdmap *map;
1248        int ret;
1249
1250        map = ceph_osdmap_alloc();
1251        if (!map)
1252                return ERR_PTR(-ENOMEM);
1253
1254        ret = osdmap_decode(p, end, map);
1255        if (ret) {
1256                ceph_osdmap_destroy(map);
1257                return ERR_PTR(ret);
1258        }
1259
1260        return map;
1261}
1262
1263/*
1264 * Encoding order is (new_up_client, new_state, new_weight).  Need to
1265 * apply in the (new_weight, new_state, new_up_client) order, because
1266 * an incremental map may look like e.g.
1267 *
1268 *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1269 *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1270 */
1271static int decode_new_up_state_weight(void **p, void *end,
1272                                      struct ceph_osdmap *map)
1273{
1274        void *new_up_client;
1275        void *new_state;
1276        void *new_weight_end;
1277        u32 len;
1278
1279        new_up_client = *p;
1280        ceph_decode_32_safe(p, end, len, e_inval);
1281        len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
1282        ceph_decode_need(p, end, len, e_inval);
1283        *p += len;
1284
1285        new_state = *p;
1286        ceph_decode_32_safe(p, end, len, e_inval);
1287        len *= sizeof(u32) + sizeof(u8);
1288        ceph_decode_need(p, end, len, e_inval);
1289        *p += len;
1290
1291        /* new_weight */
1292        ceph_decode_32_safe(p, end, len, e_inval);
1293        while (len--) {
1294                s32 osd;
1295                u32 w;
1296
1297                ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1298                osd = ceph_decode_32(p);
1299                w = ceph_decode_32(p);
1300                BUG_ON(osd >= map->max_osd);
1301                pr_info("osd%d weight 0x%x %s\n", osd, w,
1302                     w == CEPH_OSD_IN ? "(in)" :
1303                     (w == CEPH_OSD_OUT ? "(out)" : ""));
1304                map->osd_weight[osd] = w;
1305
1306                /*
1307                 * If we are marking in, set the EXISTS, and clear the
1308                 * AUTOOUT and NEW bits.
1309                 */
1310                if (w) {
1311                        map->osd_state[osd] |= CEPH_OSD_EXISTS;
1312                        map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1313                                                 CEPH_OSD_NEW);
1314                }
1315        }
1316        new_weight_end = *p;
1317
1318        /* new_state (up/down) */
1319        *p = new_state;
1320        len = ceph_decode_32(p);
1321        while (len--) {
1322                s32 osd;
1323                u8 xorstate;
1324                int ret;
1325
1326                osd = ceph_decode_32(p);
1327                xorstate = ceph_decode_8(p);
1328                if (xorstate == 0)
1329                        xorstate = CEPH_OSD_UP;
1330                BUG_ON(osd >= map->max_osd);
1331                if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1332                    (xorstate & CEPH_OSD_UP))
1333                        pr_info("osd%d down\n", osd);
1334                if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1335                    (xorstate & CEPH_OSD_EXISTS)) {
1336                        pr_info("osd%d does not exist\n", osd);
1337                        map->osd_weight[osd] = CEPH_OSD_IN;
1338                        ret = set_primary_affinity(map, osd,
1339                                                   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1340                        if (ret)
1341                                return ret;
1342                        memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1343                        map->osd_state[osd] = 0;
1344                } else {
1345                        map->osd_state[osd] ^= xorstate;
1346                }
1347        }
1348
1349        /* new_up_client */
1350        *p = new_up_client;
1351        len = ceph_decode_32(p);
1352        while (len--) {
1353                s32 osd;
1354                struct ceph_entity_addr addr;
1355
1356                osd = ceph_decode_32(p);
1357                ceph_decode_copy(p, &addr, sizeof(addr));
1358                ceph_decode_addr(&addr);
1359                BUG_ON(osd >= map->max_osd);
1360                pr_info("osd%d up\n", osd);
1361                map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1362                map->osd_addr[osd] = addr;
1363        }
1364
1365        *p = new_weight_end;
1366        return 0;
1367
1368e_inval:
1369        return -EINVAL;
1370}
1371
1372/*
1373 * decode and apply an incremental map update.
1374 */
1375struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1376                                             struct ceph_osdmap *map)
1377{
1378        struct crush_map *newcrush = NULL;
1379        struct ceph_fsid fsid;
1380        u32 epoch = 0;
1381        struct ceph_timespec modified;
1382        s32 len;
1383        u64 pool;
1384        __s64 new_pool_max;
1385        __s32 new_flags, max;
1386        void *start = *p;
1387        int err;
1388        u8 struct_v;
1389
1390        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1391
1392        err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1393        if (err)
1394                goto bad;
1395
1396        /* fsid, epoch, modified, new_pool_max, new_flags */
1397        ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1398                         sizeof(u64) + sizeof(u32), e_inval);
1399        ceph_decode_copy(p, &fsid, sizeof(fsid));
1400        epoch = ceph_decode_32(p);
1401        BUG_ON(epoch != map->epoch+1);
1402        ceph_decode_copy(p, &modified, sizeof(modified));
1403        new_pool_max = ceph_decode_64(p);
1404        new_flags = ceph_decode_32(p);
1405
1406        /* full map? */
1407        ceph_decode_32_safe(p, end, len, e_inval);
1408        if (len > 0) {
1409                dout("apply_incremental full map len %d, %p to %p\n",
1410                     len, *p, end);
1411                return ceph_osdmap_decode(p, min(*p+len, end));
1412        }
1413
1414        /* new crush? */
1415        ceph_decode_32_safe(p, end, len, e_inval);
1416        if (len > 0) {
1417                newcrush = crush_decode(*p, min(*p+len, end));
1418                if (IS_ERR(newcrush)) {
1419                        err = PTR_ERR(newcrush);
1420                        newcrush = NULL;
1421                        goto bad;
1422                }
1423                *p += len;
1424        }
1425
1426        /* new flags? */
1427        if (new_flags >= 0)
1428                map->flags = new_flags;
1429        if (new_pool_max >= 0)
1430                map->pool_max = new_pool_max;
1431
1432        /* new max? */
1433        ceph_decode_32_safe(p, end, max, e_inval);
1434        if (max >= 0) {
1435                err = osdmap_set_max_osd(map, max);
1436                if (err)
1437                        goto bad;
1438        }
1439
1440        map->epoch++;
1441        map->modified = modified;
1442        if (newcrush) {
1443                if (map->crush)
1444                        crush_destroy(map->crush);
1445                map->crush = newcrush;
1446                newcrush = NULL;
1447        }
1448
1449        /* new_pools */
1450        err = decode_new_pools(p, end, map);
1451        if (err)
1452                goto bad;
1453
1454        /* new_pool_names */
1455        err = decode_pool_names(p, end, map);
1456        if (err)
1457                goto bad;
1458
1459        /* old_pool */
1460        ceph_decode_32_safe(p, end, len, e_inval);
1461        while (len--) {
1462                struct ceph_pg_pool_info *pi;
1463
1464                ceph_decode_64_safe(p, end, pool, e_inval);
1465                pi = __lookup_pg_pool(&map->pg_pools, pool);
1466                if (pi)
1467                        __remove_pg_pool(&map->pg_pools, pi);
1468        }
1469
1470        /* new_up_client, new_state, new_weight */
1471        err = decode_new_up_state_weight(p, end, map);
1472        if (err)
1473                goto bad;
1474
1475        /* new_pg_temp */
1476        err = decode_new_pg_temp(p, end, map);
1477        if (err)
1478                goto bad;
1479
1480        /* new_primary_temp */
1481        if (struct_v >= 1) {
1482                err = decode_new_primary_temp(p, end, map);
1483                if (err)
1484                        goto bad;
1485        }
1486
1487        /* new_primary_affinity */
1488        if (struct_v >= 2) {
1489                err = decode_new_primary_affinity(p, end, map);
1490                if (err)
1491                        goto bad;
1492        }
1493
1494        /* ignore the rest */
1495        *p = end;
1496
1497        dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1498        return map;
1499
1500e_inval:
1501        err = -EINVAL;
1502bad:
1503        pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1504               err, epoch, (int)(*p - start), *p, start, end);
1505        print_hex_dump(KERN_DEBUG, "osdmap: ",
1506                       DUMP_PREFIX_OFFSET, 16, 1,
1507                       start, end - start, true);
1508        if (newcrush)
1509                crush_destroy(newcrush);
1510        return ERR_PTR(err);
1511}
1512
1513void ceph_oloc_copy(struct ceph_object_locator *dest,
1514                    const struct ceph_object_locator *src)
1515{
1516        WARN_ON(!ceph_oloc_empty(dest));
1517        WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
1518
1519        dest->pool = src->pool;
1520        if (src->pool_ns)
1521                dest->pool_ns = ceph_get_string(src->pool_ns);
1522}
1523EXPORT_SYMBOL(ceph_oloc_copy);
1524
1525void ceph_oloc_destroy(struct ceph_object_locator *oloc)
1526{
1527        ceph_put_string(oloc->pool_ns);
1528}
1529EXPORT_SYMBOL(ceph_oloc_destroy);
1530
1531void ceph_oid_copy(struct ceph_object_id *dest,
1532                   const struct ceph_object_id *src)
1533{
1534        WARN_ON(!ceph_oid_empty(dest));
1535
1536        if (src->name != src->inline_name) {
1537                /* very rare, see ceph_object_id definition */
1538                dest->name = kmalloc(src->name_len + 1,
1539                                     GFP_NOIO | __GFP_NOFAIL);
1540        }
1541
1542        memcpy(dest->name, src->name, src->name_len + 1);
1543        dest->name_len = src->name_len;
1544}
1545EXPORT_SYMBOL(ceph_oid_copy);
1546
1547static __printf(2, 0)
1548int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
1549{
1550        int len;
1551
1552        WARN_ON(!ceph_oid_empty(oid));
1553
1554        len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
1555        if (len >= sizeof(oid->inline_name))
1556                return len;
1557
1558        oid->name_len = len;
1559        return 0;
1560}
1561
1562/*
1563 * If oid doesn't fit into inline buffer, BUG.
1564 */
1565void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
1566{
1567        va_list ap;
1568
1569        va_start(ap, fmt);
1570        BUG_ON(oid_printf_vargs(oid, fmt, ap));
1571        va_end(ap);
1572}
1573EXPORT_SYMBOL(ceph_oid_printf);
1574
1575static __printf(3, 0)
1576int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
1577                      const char *fmt, va_list ap)
1578{
1579        va_list aq;
1580        int len;
1581
1582        va_copy(aq, ap);
1583        len = oid_printf_vargs(oid, fmt, aq);
1584        va_end(aq);
1585
1586        if (len) {
1587                char *external_name;
1588
1589                external_name = kmalloc(len + 1, gfp);
1590                if (!external_name)
1591                        return -ENOMEM;
1592
1593                oid->name = external_name;
1594                WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
1595                oid->name_len = len;
1596        }
1597
1598        return 0;
1599}
1600
1601/*
1602 * If oid doesn't fit into inline buffer, allocate.
1603 */
1604int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
1605                     const char *fmt, ...)
1606{
1607        va_list ap;
1608        int ret;
1609
1610        va_start(ap, fmt);
1611        ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
1612        va_end(ap);
1613
1614        return ret;
1615}
1616EXPORT_SYMBOL(ceph_oid_aprintf);
1617
1618void ceph_oid_destroy(struct ceph_object_id *oid)
1619{
1620        if (oid->name != oid->inline_name)
1621                kfree(oid->name);
1622}
1623EXPORT_SYMBOL(ceph_oid_destroy);
1624
1625/*
1626 * osds only
1627 */
1628static bool __osds_equal(const struct ceph_osds *lhs,
1629                         const struct ceph_osds *rhs)
1630{
1631        if (lhs->size == rhs->size &&
1632            !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
1633                return true;
1634
1635        return false;
1636}
1637
1638/*
1639 * osds + primary
1640 */
1641static bool osds_equal(const struct ceph_osds *lhs,
1642                       const struct ceph_osds *rhs)
1643{
1644        if (__osds_equal(lhs, rhs) &&
1645            lhs->primary == rhs->primary)
1646                return true;
1647
1648        return false;
1649}
1650
1651static bool osds_valid(const struct ceph_osds *set)
1652{
1653        /* non-empty set */
1654        if (set->size > 0 && set->primary >= 0)
1655                return true;
1656
1657        /* empty can_shift_osds set */
1658        if (!set->size && set->primary == -1)
1659                return true;
1660
1661        /* empty !can_shift_osds set - all NONE */
1662        if (set->size > 0 && set->primary == -1) {
1663                int i;
1664
1665                for (i = 0; i < set->size; i++) {
1666                        if (set->osds[i] != CRUSH_ITEM_NONE)
1667                                break;
1668                }
1669                if (i == set->size)
1670                        return true;
1671        }
1672
1673        return false;
1674}
1675
1676void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
1677{
1678        memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
1679        dest->size = src->size;
1680        dest->primary = src->primary;
1681}
1682
1683static bool is_split(const struct ceph_pg *pgid,
1684                     u32 old_pg_num,
1685                     u32 new_pg_num)
1686{
1687        int old_bits = calc_bits_of(old_pg_num);
1688        int old_mask = (1 << old_bits) - 1;
1689        int n;
1690
1691        WARN_ON(pgid->seed >= old_pg_num);
1692        if (new_pg_num <= old_pg_num)
1693                return false;
1694
1695        for (n = 1; ; n++) {
1696                int next_bit = n << (old_bits - 1);
1697                u32 s = next_bit | pgid->seed;
1698
1699                if (s < old_pg_num || s == pgid->seed)
1700                        continue;
1701                if (s >= new_pg_num)
1702                        break;
1703
1704                s = ceph_stable_mod(s, old_pg_num, old_mask);
1705                if (s == pgid->seed)
1706                        return true;
1707        }
1708
1709        return false;
1710}
1711
1712bool ceph_is_new_interval(const struct ceph_osds *old_acting,
1713                          const struct ceph_osds *new_acting,
1714                          const struct ceph_osds *old_up,
1715                          const struct ceph_osds *new_up,
1716                          int old_size,
1717                          int new_size,
1718                          int old_min_size,
1719                          int new_min_size,
1720                          u32 old_pg_num,
1721                          u32 new_pg_num,
1722                          bool old_sort_bitwise,
1723                          bool new_sort_bitwise,
1724                          const struct ceph_pg *pgid)
1725{
1726        return !osds_equal(old_acting, new_acting) ||
1727               !osds_equal(old_up, new_up) ||
1728               old_size != new_size ||
1729               old_min_size != new_min_size ||
1730               is_split(pgid, old_pg_num, new_pg_num) ||
1731               old_sort_bitwise != new_sort_bitwise;
1732}
1733
1734static int calc_pg_rank(int osd, const struct ceph_osds *acting)
1735{
1736        int i;
1737
1738        for (i = 0; i < acting->size; i++) {
1739                if (acting->osds[i] == osd)
1740                        return i;
1741        }
1742
1743        return -1;
1744}
1745
1746static bool primary_changed(const struct ceph_osds *old_acting,
1747                            const struct ceph_osds *new_acting)
1748{
1749        if (!old_acting->size && !new_acting->size)
1750                return false; /* both still empty */
1751
1752        if (!old_acting->size ^ !new_acting->size)
1753                return true; /* was empty, now not, or vice versa */
1754
1755        if (old_acting->primary != new_acting->primary)
1756                return true; /* primary changed */
1757
1758        if (calc_pg_rank(old_acting->primary, old_acting) !=
1759            calc_pg_rank(new_acting->primary, new_acting))
1760                return true;
1761
1762        return false; /* same primary (tho replicas may have changed) */
1763}
1764
1765bool ceph_osds_changed(const struct ceph_osds *old_acting,
1766                       const struct ceph_osds *new_acting,
1767                       bool any_change)
1768{
1769        if (primary_changed(old_acting, new_acting))
1770                return true;
1771
1772        if (any_change && !__osds_equal(old_acting, new_acting))
1773                return true;
1774
1775        return false;
1776}
1777
1778/*
1779 * calculate file layout from given offset, length.
1780 * fill in correct oid, logical length, and object extent
1781 * offset, length.
1782 *
1783 * for now, we write only a single su, until we can
1784 * pass a stride back to the caller.
1785 */
1786int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1787                                   u64 off, u64 len,
1788                                   u64 *ono,
1789                                   u64 *oxoff, u64 *oxlen)
1790{
1791        u32 osize = layout->object_size;
1792        u32 su = layout->stripe_unit;
1793        u32 sc = layout->stripe_count;
1794        u32 bl, stripeno, stripepos, objsetno;
1795        u32 su_per_object;
1796        u64 t, su_offset;
1797
1798        dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
1799             osize, su);
1800        if (su == 0 || sc == 0)
1801                goto invalid;
1802        su_per_object = osize / su;
1803        if (su_per_object == 0)
1804                goto invalid;
1805        dout("osize %u / su %u = su_per_object %u\n", osize, su,
1806             su_per_object);
1807
1808        if ((su & ~PAGE_MASK) != 0)
1809                goto invalid;
1810
1811        /* bl = *off / su; */
1812        t = off;
1813        do_div(t, su);
1814        bl = t;
1815        dout("off %llu / su %u = bl %u\n", off, su, bl);
1816
1817        stripeno = bl / sc;
1818        stripepos = bl % sc;
1819        objsetno = stripeno / su_per_object;
1820
1821        *ono = objsetno * sc + stripepos;
1822        dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono);
1823
1824        /* *oxoff = *off % layout->fl_stripe_unit;  # offset in su */
1825        t = off;
1826        su_offset = do_div(t, su);
1827        *oxoff = su_offset + (stripeno % su_per_object) * su;
1828
1829        /*
1830         * Calculate the length of the extent being written to the selected
1831         * object. This is the minimum of the full length requested (len) or
1832         * the remainder of the current stripe being written to.
1833         */
1834        *oxlen = min_t(u64, len, su - su_offset);
1835
1836        dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
1837        return 0;
1838
1839invalid:
1840        dout(" invalid layout\n");
1841        *ono = 0;
1842        *oxoff = 0;
1843        *oxlen = 0;
1844        return -EINVAL;
1845}
1846EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1847
1848/*
1849 * Map an object into a PG.
1850 *
1851 * Should only be called with target_oid and target_oloc (as opposed to
1852 * base_oid and base_oloc), since tiering isn't taken into account.
1853 */
1854int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
1855                              struct ceph_object_id *oid,
1856                              struct ceph_object_locator *oloc,
1857                              struct ceph_pg *raw_pgid)
1858{
1859        struct ceph_pg_pool_info *pi;
1860
1861        pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
1862        if (!pi)
1863                return -ENOENT;
1864
1865        if (!oloc->pool_ns) {
1866                raw_pgid->pool = oloc->pool;
1867                raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
1868                                             oid->name_len);
1869                dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
1870                     raw_pgid->pool, raw_pgid->seed);
1871        } else {
1872                char stack_buf[256];
1873                char *buf = stack_buf;
1874                int nsl = oloc->pool_ns->len;
1875                size_t total = nsl + 1 + oid->name_len;
1876
1877                if (total > sizeof(stack_buf)) {
1878                        buf = kmalloc(total, GFP_NOIO);
1879                        if (!buf)
1880                                return -ENOMEM;
1881                }
1882                memcpy(buf, oloc->pool_ns->str, nsl);
1883                buf[nsl] = '\037';
1884                memcpy(buf + nsl + 1, oid->name, oid->name_len);
1885                raw_pgid->pool = oloc->pool;
1886                raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
1887                if (buf != stack_buf)
1888                        kfree(buf);
1889                dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
1890                     oid->name, nsl, oloc->pool_ns->str,
1891                     raw_pgid->pool, raw_pgid->seed);
1892        }
1893        return 0;
1894}
1895EXPORT_SYMBOL(ceph_object_locator_to_pg);
1896
1897/*
1898 * Map a raw PG (full precision ps) into an actual PG.
1899 */
1900static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
1901                         const struct ceph_pg *raw_pgid,
1902                         struct ceph_pg *pgid)
1903{
1904        pgid->pool = raw_pgid->pool;
1905        pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
1906                                     pi->pg_num_mask);
1907}
1908
1909/*
1910 * Map a raw PG (full precision ps) into a placement ps (placement
1911 * seed).  Include pool id in that value so that different pools don't
1912 * use the same seeds.
1913 */
1914static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
1915                         const struct ceph_pg *raw_pgid)
1916{
1917        if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1918                /* hash pool id and seed so that pool PGs do not overlap */
1919                return crush_hash32_2(CRUSH_HASH_RJENKINS1,
1920                                      ceph_stable_mod(raw_pgid->seed,
1921                                                      pi->pgp_num,
1922                                                      pi->pgp_num_mask),
1923                                      raw_pgid->pool);
1924        } else {
1925                /*
1926                 * legacy behavior: add ps and pool together.  this is
1927                 * not a great approach because the PGs from each pool
1928                 * will overlap on top of each other: 0.5 == 1.4 ==
1929                 * 2.3 == ...
1930                 */
1931                return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
1932                                       pi->pgp_num_mask) +
1933                       (unsigned)raw_pgid->pool;
1934        }
1935}
1936
1937static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1938                    int *result, int result_max,
1939                    const __u32 *weight, int weight_max)
1940{
1941        int r;
1942
1943        BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1944
1945        mutex_lock(&map->crush_scratch_mutex);
1946        r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1947                          weight, weight_max, map->crush_scratch_ary);
1948        mutex_unlock(&map->crush_scratch_mutex);
1949
1950        return r;
1951}
1952
1953/*
1954 * Calculate raw set (CRUSH output) for given PG.  The result may
1955 * contain nonexistent OSDs.  ->primary is undefined for a raw set.
1956 *
1957 * Placement seed (CRUSH input) is returned through @ppps.
1958 */
1959static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
1960                           struct ceph_pg_pool_info *pi,
1961                           const struct ceph_pg *raw_pgid,
1962                           struct ceph_osds *raw,
1963                           u32 *ppps)
1964{
1965        u32 pps = raw_pg_to_pps(pi, raw_pgid);
1966        int ruleno;
1967        int len;
1968
1969        ceph_osds_init(raw);
1970        if (ppps)
1971                *ppps = pps;
1972
1973        ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
1974                                 pi->size);
1975        if (ruleno < 0) {
1976                pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
1977                       pi->id, pi->crush_ruleset, pi->type, pi->size);
1978                return;
1979        }
1980
1981        len = do_crush(osdmap, ruleno, pps, raw->osds,
1982                       min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
1983                       osdmap->osd_weight, osdmap->max_osd);
1984        if (len < 0) {
1985                pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
1986                       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
1987                       pi->size);
1988                return;
1989        }
1990
1991        raw->size = len;
1992}
1993
1994/*
1995 * Given raw set, calculate up set and up primary.  By definition of an
1996 * up set, the result won't contain nonexistent or down OSDs.
1997 *
1998 * This is done in-place - on return @set is the up set.  If it's
1999 * empty, ->primary will remain undefined.
2000 */
2001static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2002                           struct ceph_pg_pool_info *pi,
2003                           struct ceph_osds *set)
2004{
2005        int i;
2006
2007        /* ->primary is undefined for a raw set */
2008        BUG_ON(set->primary != -1);
2009
2010        if (ceph_can_shift_osds(pi)) {
2011                int removed = 0;
2012
2013                /* shift left */
2014                for (i = 0; i < set->size; i++) {
2015                        if (ceph_osd_is_down(osdmap, set->osds[i])) {
2016                                removed++;
2017                                continue;
2018                        }
2019                        if (removed)
2020                                set->osds[i - removed] = set->osds[i];
2021                }
2022                set->size -= removed;
2023                if (set->size > 0)
2024                        set->primary = set->osds[0];
2025        } else {
2026                /* set down/dne devices to NONE */
2027                for (i = set->size - 1; i >= 0; i--) {
2028                        if (ceph_osd_is_down(osdmap, set->osds[i]))
2029                                set->osds[i] = CRUSH_ITEM_NONE;
2030                        else
2031                                set->primary = set->osds[i];
2032                }
2033        }
2034}
2035
2036static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2037                                   struct ceph_pg_pool_info *pi,
2038                                   u32 pps,
2039                                   struct ceph_osds *up)
2040{
2041        int i;
2042        int pos = -1;
2043
2044        /*
2045         * Do we have any non-default primary_affinity values for these
2046         * osds?
2047         */
2048        if (!osdmap->osd_primary_affinity)
2049                return;
2050
2051        for (i = 0; i < up->size; i++) {
2052                int osd = up->osds[i];
2053
2054                if (osd != CRUSH_ITEM_NONE &&
2055                    osdmap->osd_primary_affinity[osd] !=
2056                                        CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2057                        break;
2058                }
2059        }
2060        if (i == up->size)
2061                return;
2062
2063        /*
2064         * Pick the primary.  Feed both the seed (for the pg) and the
2065         * osd into the hash/rng so that a proportional fraction of an
2066         * osd's pgs get rejected as primary.
2067         */
2068        for (i = 0; i < up->size; i++) {
2069                int osd = up->osds[i];
2070                u32 aff;
2071
2072                if (osd == CRUSH_ITEM_NONE)
2073                        continue;
2074
2075                aff = osdmap->osd_primary_affinity[osd];
2076                if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2077                    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2078                                    pps, osd) >> 16) >= aff) {
2079                        /*
2080                         * We chose not to use this primary.  Note it
2081                         * anyway as a fallback in case we don't pick
2082                         * anyone else, but keep looking.
2083                         */
2084                        if (pos < 0)
2085                                pos = i;
2086                } else {
2087                        pos = i;
2088                        break;
2089                }
2090        }
2091        if (pos < 0)
2092                return;
2093
2094        up->primary = up->osds[pos];
2095
2096        if (ceph_can_shift_osds(pi) && pos > 0) {
2097                /* move the new primary to the front */
2098                for (i = pos; i > 0; i--)
2099                        up->osds[i] = up->osds[i - 1];
2100                up->osds[0] = up->primary;
2101        }
2102}
2103
2104/*
2105 * Get pg_temp and primary_temp mappings for given PG.
2106 *
2107 * Note that a PG may have none, only pg_temp, only primary_temp or
2108 * both pg_temp and primary_temp mappings.  This means @temp isn't
2109 * always a valid OSD set on return: in the "only primary_temp" case,
2110 * @temp will have its ->primary >= 0 but ->size == 0.
2111 */
2112static void get_temp_osds(struct ceph_osdmap *osdmap,
2113                          struct ceph_pg_pool_info *pi,
2114                          const struct ceph_pg *raw_pgid,
2115                          struct ceph_osds *temp)
2116{
2117        struct ceph_pg pgid;
2118        struct ceph_pg_mapping *pg;
2119        int i;
2120
2121        raw_pg_to_pg(pi, raw_pgid, &pgid);
2122        ceph_osds_init(temp);
2123
2124        /* pg_temp? */
2125        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
2126        if (pg) {
2127                for (i = 0; i < pg->pg_temp.len; i++) {
2128                        if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2129                                if (ceph_can_shift_osds(pi))
2130                                        continue;
2131
2132                                temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2133                        } else {
2134                                temp->osds[temp->size++] = pg->pg_temp.osds[i];
2135                        }
2136                }
2137
2138                /* apply pg_temp's primary */
2139                for (i = 0; i < temp->size; i++) {
2140                        if (temp->osds[i] != CRUSH_ITEM_NONE) {
2141                                temp->primary = temp->osds[i];
2142                                break;
2143                        }
2144                }
2145        }
2146
2147        /* primary_temp? */
2148        pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
2149        if (pg)
2150                temp->primary = pg->primary_temp.osd;
2151}
2152
2153/*
2154 * Map a PG to its acting set as well as its up set.
2155 *
2156 * Acting set is used for data mapping purposes, while up set can be
2157 * recorded for detecting interval changes and deciding whether to
2158 * resend a request.
2159 */
2160void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2161                               const struct ceph_pg *raw_pgid,
2162                               struct ceph_osds *up,
2163                               struct ceph_osds *acting)
2164{
2165        struct ceph_pg_pool_info *pi;
2166        u32 pps;
2167
2168        pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2169        if (!pi) {
2170                ceph_osds_init(up);
2171                ceph_osds_init(acting);
2172                goto out;
2173        }
2174
2175        pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2176        raw_to_up_osds(osdmap, pi, up);
2177        apply_primary_affinity(osdmap, pi, pps, up);
2178        get_temp_osds(osdmap, pi, raw_pgid, acting);
2179        if (!acting->size) {
2180                memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2181                acting->size = up->size;
2182                if (acting->primary == -1)
2183                        acting->primary = up->primary;
2184        }
2185out:
2186        WARN_ON(!osds_valid(up) || !osds_valid(acting));
2187}
2188
2189/*
2190 * Return acting primary for given PG, or -1 if none.
2191 */
2192int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2193                              const struct ceph_pg *raw_pgid)
2194{
2195        struct ceph_osds up, acting;
2196
2197        ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting);
2198        return acting.primary;
2199}
2200EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2201