linux/net/ceph/osdmap.c
<<
>>
Prefs
   1
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/module.h>
   5#include <linux/slab.h>
   6#include <asm/div64.h>
   7
   8#include <linux/ceph/libceph.h>
   9#include <linux/ceph/osdmap.h>
  10#include <linux/ceph/decode.h>
  11#include <linux/crush/hash.h>
  12#include <linux/crush/mapper.h>
  13
  14char *ceph_osdmap_state_str(char *str, int len, int state)
  15{
  16        if (!len)
  17                return str;
  18
  19        if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
  20                snprintf(str, len, "exists, up");
  21        else if (state & CEPH_OSD_EXISTS)
  22                snprintf(str, len, "exists");
  23        else if (state & CEPH_OSD_UP)
  24                snprintf(str, len, "up");
  25        else
  26                snprintf(str, len, "doesn't exist");
  27
  28        return str;
  29}
  30
  31/* maps */
  32
  33static int calc_bits_of(unsigned int t)
  34{
  35        int b = 0;
  36        while (t) {
  37                t = t >> 1;
  38                b++;
  39        }
  40        return b;
  41}
  42
  43/*
  44 * the foo_mask is the smallest value 2^n-1 that is >= foo.
  45 */
  46static void calc_pg_masks(struct ceph_pg_pool_info *pi)
  47{
  48        pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
  49        pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
  50}
  51
  52/*
  53 * decode crush map
  54 */
  55static int crush_decode_uniform_bucket(void **p, void *end,
  56                                       struct crush_bucket_uniform *b)
  57{
  58        dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
  59        ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
  60        b->item_weight = ceph_decode_32(p);
  61        return 0;
  62bad:
  63        return -EINVAL;
  64}
  65
  66static int crush_decode_list_bucket(void **p, void *end,
  67                                    struct crush_bucket_list *b)
  68{
  69        int j;
  70        dout("crush_decode_list_bucket %p to %p\n", *p, end);
  71        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  72        if (b->item_weights == NULL)
  73                return -ENOMEM;
  74        b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  75        if (b->sum_weights == NULL)
  76                return -ENOMEM;
  77        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
  78        for (j = 0; j < b->h.size; j++) {
  79                b->item_weights[j] = ceph_decode_32(p);
  80                b->sum_weights[j] = ceph_decode_32(p);
  81        }
  82        return 0;
  83bad:
  84        return -EINVAL;
  85}
  86
  87static int crush_decode_tree_bucket(void **p, void *end,
  88                                    struct crush_bucket_tree *b)
  89{
  90        int j;
  91        dout("crush_decode_tree_bucket %p to %p\n", *p, end);
  92        ceph_decode_8_safe(p, end, b->num_nodes, bad);
  93        b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
  94        if (b->node_weights == NULL)
  95                return -ENOMEM;
  96        ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
  97        for (j = 0; j < b->num_nodes; j++)
  98                b->node_weights[j] = ceph_decode_32(p);
  99        return 0;
 100bad:
 101        return -EINVAL;
 102}
 103
 104static int crush_decode_straw_bucket(void **p, void *end,
 105                                     struct crush_bucket_straw *b)
 106{
 107        int j;
 108        dout("crush_decode_straw_bucket %p to %p\n", *p, end);
 109        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 110        if (b->item_weights == NULL)
 111                return -ENOMEM;
 112        b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 113        if (b->straws == NULL)
 114                return -ENOMEM;
 115        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
 116        for (j = 0; j < b->h.size; j++) {
 117                b->item_weights[j] = ceph_decode_32(p);
 118                b->straws[j] = ceph_decode_32(p);
 119        }
 120        return 0;
 121bad:
 122        return -EINVAL;
 123}
 124
 125static int crush_decode_straw2_bucket(void **p, void *end,
 126                                      struct crush_bucket_straw2 *b)
 127{
 128        int j;
 129        dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
 130        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 131        if (b->item_weights == NULL)
 132                return -ENOMEM;
 133        ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
 134        for (j = 0; j < b->h.size; j++)
 135                b->item_weights[j] = ceph_decode_32(p);
 136        return 0;
 137bad:
 138        return -EINVAL;
 139}
 140
 141static int skip_name_map(void **p, void *end)
 142{
 143        int len;
 144        ceph_decode_32_safe(p, end, len ,bad);
 145        while (len--) {
 146                int strlen;
 147                *p += sizeof(u32);
 148                ceph_decode_32_safe(p, end, strlen, bad);
 149                *p += strlen;
 150}
 151        return 0;
 152bad:
 153        return -EINVAL;
 154}
 155
 156static void crush_finalize(struct crush_map *c)
 157{
 158        __s32 b;
 159
 160        /* Space for the array of pointers to per-bucket workspace */
 161        c->working_size = sizeof(struct crush_work) +
 162            c->max_buckets * sizeof(struct crush_work_bucket *);
 163
 164        for (b = 0; b < c->max_buckets; b++) {
 165                if (!c->buckets[b])
 166                        continue;
 167
 168                switch (c->buckets[b]->alg) {
 169                default:
 170                        /*
 171                         * The base case, permutation variables and
 172                         * the pointer to the permutation array.
 173                         */
 174                        c->working_size += sizeof(struct crush_work_bucket);
 175                        break;
 176                }
 177                /* Every bucket has a permutation array. */
 178                c->working_size += c->buckets[b]->size * sizeof(__u32);
 179        }
 180}
 181
 182static struct crush_map *crush_decode(void *pbyval, void *end)
 183{
 184        struct crush_map *c;
 185        int err = -EINVAL;
 186        int i, j;
 187        void **p = &pbyval;
 188        void *start = pbyval;
 189        u32 magic;
 190        u32 num_name_maps;
 191
 192        dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
 193
 194        c = kzalloc(sizeof(*c), GFP_NOFS);
 195        if (c == NULL)
 196                return ERR_PTR(-ENOMEM);
 197
 198        /* set tunables to default values */
 199        c->choose_local_tries = 2;
 200        c->choose_local_fallback_tries = 5;
 201        c->choose_total_tries = 19;
 202        c->chooseleaf_descend_once = 0;
 203
 204        ceph_decode_need(p, end, 4*sizeof(u32), bad);
 205        magic = ceph_decode_32(p);
 206        if (magic != CRUSH_MAGIC) {
 207                pr_err("crush_decode magic %x != current %x\n",
 208                       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
 209                goto bad;
 210        }
 211        c->max_buckets = ceph_decode_32(p);
 212        c->max_rules = ceph_decode_32(p);
 213        c->max_devices = ceph_decode_32(p);
 214
 215        c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
 216        if (c->buckets == NULL)
 217                goto badmem;
 218        c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
 219        if (c->rules == NULL)
 220                goto badmem;
 221
 222        /* buckets */
 223        for (i = 0; i < c->max_buckets; i++) {
 224                int size = 0;
 225                u32 alg;
 226                struct crush_bucket *b;
 227
 228                ceph_decode_32_safe(p, end, alg, bad);
 229                if (alg == 0) {
 230                        c->buckets[i] = NULL;
 231                        continue;
 232                }
 233                dout("crush_decode bucket %d off %x %p to %p\n",
 234                     i, (int)(*p-start), *p, end);
 235
 236                switch (alg) {
 237                case CRUSH_BUCKET_UNIFORM:
 238                        size = sizeof(struct crush_bucket_uniform);
 239                        break;
 240                case CRUSH_BUCKET_LIST:
 241                        size = sizeof(struct crush_bucket_list);
 242                        break;
 243                case CRUSH_BUCKET_TREE:
 244                        size = sizeof(struct crush_bucket_tree);
 245                        break;
 246                case CRUSH_BUCKET_STRAW:
 247                        size = sizeof(struct crush_bucket_straw);
 248                        break;
 249                case CRUSH_BUCKET_STRAW2:
 250                        size = sizeof(struct crush_bucket_straw2);
 251                        break;
 252                default:
 253                        err = -EINVAL;
 254                        goto bad;
 255                }
 256                BUG_ON(size == 0);
 257                b = c->buckets[i] = kzalloc(size, GFP_NOFS);
 258                if (b == NULL)
 259                        goto badmem;
 260
 261                ceph_decode_need(p, end, 4*sizeof(u32), bad);
 262                b->id = ceph_decode_32(p);
 263                b->type = ceph_decode_16(p);
 264                b->alg = ceph_decode_8(p);
 265                b->hash = ceph_decode_8(p);
 266                b->weight = ceph_decode_32(p);
 267                b->size = ceph_decode_32(p);
 268
 269                dout("crush_decode bucket size %d off %x %p to %p\n",
 270                     b->size, (int)(*p-start), *p, end);
 271
 272                b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
 273                if (b->items == NULL)
 274                        goto badmem;
 275
 276                ceph_decode_need(p, end, b->size*sizeof(u32), bad);
 277                for (j = 0; j < b->size; j++)
 278                        b->items[j] = ceph_decode_32(p);
 279
 280                switch (b->alg) {
 281                case CRUSH_BUCKET_UNIFORM:
 282                        err = crush_decode_uniform_bucket(p, end,
 283                                  (struct crush_bucket_uniform *)b);
 284                        if (err < 0)
 285                                goto bad;
 286                        break;
 287                case CRUSH_BUCKET_LIST:
 288                        err = crush_decode_list_bucket(p, end,
 289                               (struct crush_bucket_list *)b);
 290                        if (err < 0)
 291                                goto bad;
 292                        break;
 293                case CRUSH_BUCKET_TREE:
 294                        err = crush_decode_tree_bucket(p, end,
 295                                (struct crush_bucket_tree *)b);
 296                        if (err < 0)
 297                                goto bad;
 298                        break;
 299                case CRUSH_BUCKET_STRAW:
 300                        err = crush_decode_straw_bucket(p, end,
 301                                (struct crush_bucket_straw *)b);
 302                        if (err < 0)
 303                                goto bad;
 304                        break;
 305                case CRUSH_BUCKET_STRAW2:
 306                        err = crush_decode_straw2_bucket(p, end,
 307                                (struct crush_bucket_straw2 *)b);
 308                        if (err < 0)
 309                                goto bad;
 310                        break;
 311                }
 312        }
 313
 314        /* rules */
 315        dout("rule vec is %p\n", c->rules);
 316        for (i = 0; i < c->max_rules; i++) {
 317                u32 yes;
 318                struct crush_rule *r;
 319
 320                ceph_decode_32_safe(p, end, yes, bad);
 321                if (!yes) {
 322                        dout("crush_decode NO rule %d off %x %p to %p\n",
 323                             i, (int)(*p-start), *p, end);
 324                        c->rules[i] = NULL;
 325                        continue;
 326                }
 327
 328                dout("crush_decode rule %d off %x %p to %p\n",
 329                     i, (int)(*p-start), *p, end);
 330
 331                /* len */
 332                ceph_decode_32_safe(p, end, yes, bad);
 333#if BITS_PER_LONG == 32
 334                err = -EINVAL;
 335                if (yes > (ULONG_MAX - sizeof(*r))
 336                          / sizeof(struct crush_rule_step))
 337                        goto bad;
 338#endif
 339                r = c->rules[i] = kmalloc(sizeof(*r) +
 340                                          yes*sizeof(struct crush_rule_step),
 341                                          GFP_NOFS);
 342                if (r == NULL)
 343                        goto badmem;
 344                dout(" rule %d is at %p\n", i, r);
 345                r->len = yes;
 346                ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
 347                ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
 348                for (j = 0; j < r->len; j++) {
 349                        r->steps[j].op = ceph_decode_32(p);
 350                        r->steps[j].arg1 = ceph_decode_32(p);
 351                        r->steps[j].arg2 = ceph_decode_32(p);
 352                }
 353        }
 354
 355        /* ignore trailing name maps. */
 356        for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
 357                err = skip_name_map(p, end);
 358                if (err < 0)
 359                        goto done;
 360        }
 361
 362        /* tunables */
 363        ceph_decode_need(p, end, 3*sizeof(u32), done);
 364        c->choose_local_tries = ceph_decode_32(p);
 365        c->choose_local_fallback_tries =  ceph_decode_32(p);
 366        c->choose_total_tries = ceph_decode_32(p);
 367        dout("crush decode tunable choose_local_tries = %d\n",
 368             c->choose_local_tries);
 369        dout("crush decode tunable choose_local_fallback_tries = %d\n",
 370             c->choose_local_fallback_tries);
 371        dout("crush decode tunable choose_total_tries = %d\n",
 372             c->choose_total_tries);
 373
 374        ceph_decode_need(p, end, sizeof(u32), done);
 375        c->chooseleaf_descend_once = ceph_decode_32(p);
 376        dout("crush decode tunable chooseleaf_descend_once = %d\n",
 377             c->chooseleaf_descend_once);
 378
 379        ceph_decode_need(p, end, sizeof(u8), done);
 380        c->chooseleaf_vary_r = ceph_decode_8(p);
 381        dout("crush decode tunable chooseleaf_vary_r = %d\n",
 382             c->chooseleaf_vary_r);
 383
 384        /* skip straw_calc_version, allowed_bucket_algs */
 385        ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
 386        *p += sizeof(u8) + sizeof(u32);
 387
 388        ceph_decode_need(p, end, sizeof(u8), done);
 389        c->chooseleaf_stable = ceph_decode_8(p);
 390        dout("crush decode tunable chooseleaf_stable = %d\n",
 391             c->chooseleaf_stable);
 392
 393done:
 394        crush_finalize(c);
 395        dout("crush_decode success\n");
 396        return c;
 397
 398badmem:
 399        err = -ENOMEM;
 400bad:
 401        dout("crush_decode fail %d\n", err);
 402        crush_destroy(c);
 403        return ERR_PTR(err);
 404}
 405
 406int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
 407{
 408        if (lhs->pool < rhs->pool)
 409                return -1;
 410        if (lhs->pool > rhs->pool)
 411                return 1;
 412        if (lhs->seed < rhs->seed)
 413                return -1;
 414        if (lhs->seed > rhs->seed)
 415                return 1;
 416
 417        return 0;
 418}
 419
 420/*
 421 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
 422 * to a set of osds) and primary_temp (explicit primary setting)
 423 */
 424static int __insert_pg_mapping(struct ceph_pg_mapping *new,
 425                               struct rb_root *root)
 426{
 427        struct rb_node **p = &root->rb_node;
 428        struct rb_node *parent = NULL;
 429        struct ceph_pg_mapping *pg = NULL;
 430        int c;
 431
 432        dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new);
 433        while (*p) {
 434                parent = *p;
 435                pg = rb_entry(parent, struct ceph_pg_mapping, node);
 436                c = ceph_pg_compare(&new->pgid, &pg->pgid);
 437                if (c < 0)
 438                        p = &(*p)->rb_left;
 439                else if (c > 0)
 440                        p = &(*p)->rb_right;
 441                else
 442                        return -EEXIST;
 443        }
 444
 445        rb_link_node(&new->node, parent, p);
 446        rb_insert_color(&new->node, root);
 447        return 0;
 448}
 449
 450static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
 451                                                   struct ceph_pg pgid)
 452{
 453        struct rb_node *n = root->rb_node;
 454        struct ceph_pg_mapping *pg;
 455        int c;
 456
 457        while (n) {
 458                pg = rb_entry(n, struct ceph_pg_mapping, node);
 459                c = ceph_pg_compare(&pgid, &pg->pgid);
 460                if (c < 0) {
 461                        n = n->rb_left;
 462                } else if (c > 0) {
 463                        n = n->rb_right;
 464                } else {
 465                        dout("__lookup_pg_mapping %lld.%x got %p\n",
 466                             pgid.pool, pgid.seed, pg);
 467                        return pg;
 468                }
 469        }
 470        return NULL;
 471}
 472
 473static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
 474{
 475        struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
 476
 477        if (pg) {
 478                dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
 479                     pg);
 480                rb_erase(&pg->node, root);
 481                kfree(pg);
 482                return 0;
 483        }
 484        dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
 485        return -ENOENT;
 486}
 487
 488/*
 489 * rbtree of pg pool info
 490 */
 491static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
 492{
 493        struct rb_node **p = &root->rb_node;
 494        struct rb_node *parent = NULL;
 495        struct ceph_pg_pool_info *pi = NULL;
 496
 497        while (*p) {
 498                parent = *p;
 499                pi = rb_entry(parent, struct ceph_pg_pool_info, node);
 500                if (new->id < pi->id)
 501                        p = &(*p)->rb_left;
 502                else if (new->id > pi->id)
 503                        p = &(*p)->rb_right;
 504                else
 505                        return -EEXIST;
 506        }
 507
 508        rb_link_node(&new->node, parent, p);
 509        rb_insert_color(&new->node, root);
 510        return 0;
 511}
 512
 513static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
 514{
 515        struct ceph_pg_pool_info *pi;
 516        struct rb_node *n = root->rb_node;
 517
 518        while (n) {
 519                pi = rb_entry(n, struct ceph_pg_pool_info, node);
 520                if (id < pi->id)
 521                        n = n->rb_left;
 522                else if (id > pi->id)
 523                        n = n->rb_right;
 524                else
 525                        return pi;
 526        }
 527        return NULL;
 528}
 529
 530struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 531{
 532        return __lookup_pg_pool(&map->pg_pools, id);
 533}
 534
 535const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
 536{
 537        struct ceph_pg_pool_info *pi;
 538
 539        if (id == CEPH_NOPOOL)
 540                return NULL;
 541
 542        if (WARN_ON_ONCE(id > (u64) INT_MAX))
 543                return NULL;
 544
 545        pi = __lookup_pg_pool(&map->pg_pools, (int) id);
 546
 547        return pi ? pi->name : NULL;
 548}
 549EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
 550
 551int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 552{
 553        struct rb_node *rbp;
 554
 555        for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
 556                struct ceph_pg_pool_info *pi =
 557                        rb_entry(rbp, struct ceph_pg_pool_info, node);
 558                if (pi->name && strcmp(pi->name, name) == 0)
 559                        return pi->id;
 560        }
 561        return -ENOENT;
 562}
 563EXPORT_SYMBOL(ceph_pg_poolid_by_name);
 564
 565static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 566{
 567        rb_erase(&pi->node, root);
 568        kfree(pi->name);
 569        kfree(pi);
 570}
 571
 572static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 573{
 574        u8 ev, cv;
 575        unsigned len, num;
 576        void *pool_end;
 577
 578        ceph_decode_need(p, end, 2 + 4, bad);
 579        ev = ceph_decode_8(p);  /* encoding version */
 580        cv = ceph_decode_8(p); /* compat version */
 581        if (ev < 5) {
 582                pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
 583                return -EINVAL;
 584        }
 585        if (cv > 9) {
 586                pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
 587                return -EINVAL;
 588        }
 589        len = ceph_decode_32(p);
 590        ceph_decode_need(p, end, len, bad);
 591        pool_end = *p + len;
 592
 593        pi->type = ceph_decode_8(p);
 594        pi->size = ceph_decode_8(p);
 595        pi->crush_ruleset = ceph_decode_8(p);
 596        pi->object_hash = ceph_decode_8(p);
 597
 598        pi->pg_num = ceph_decode_32(p);
 599        pi->pgp_num = ceph_decode_32(p);
 600
 601        *p += 4 + 4;  /* skip lpg* */
 602        *p += 4;      /* skip last_change */
 603        *p += 8 + 4;  /* skip snap_seq, snap_epoch */
 604
 605        /* skip snaps */
 606        num = ceph_decode_32(p);
 607        while (num--) {
 608                *p += 8;  /* snapid key */
 609                *p += 1 + 1; /* versions */
 610                len = ceph_decode_32(p);
 611                *p += len;
 612        }
 613
 614        /* skip removed_snaps */
 615        num = ceph_decode_32(p);
 616        *p += num * (8 + 8);
 617
 618        *p += 8;  /* skip auid */
 619        pi->flags = ceph_decode_64(p);
 620        *p += 4;  /* skip crash_replay_interval */
 621
 622        if (ev >= 7)
 623                pi->min_size = ceph_decode_8(p);
 624        else
 625                pi->min_size = pi->size - pi->size / 2;
 626
 627        if (ev >= 8)
 628                *p += 8 + 8;  /* skip quota_max_* */
 629
 630        if (ev >= 9) {
 631                /* skip tiers */
 632                num = ceph_decode_32(p);
 633                *p += num * 8;
 634
 635                *p += 8;  /* skip tier_of */
 636                *p += 1;  /* skip cache_mode */
 637
 638                pi->read_tier = ceph_decode_64(p);
 639                pi->write_tier = ceph_decode_64(p);
 640        } else {
 641                pi->read_tier = -1;
 642                pi->write_tier = -1;
 643        }
 644
 645        if (ev >= 10) {
 646                /* skip properties */
 647                num = ceph_decode_32(p);
 648                while (num--) {
 649                        len = ceph_decode_32(p);
 650                        *p += len; /* key */
 651                        len = ceph_decode_32(p);
 652                        *p += len; /* val */
 653                }
 654        }
 655
 656        if (ev >= 11) {
 657                /* skip hit_set_params */
 658                *p += 1 + 1; /* versions */
 659                len = ceph_decode_32(p);
 660                *p += len;
 661
 662                *p += 4; /* skip hit_set_period */
 663                *p += 4; /* skip hit_set_count */
 664        }
 665
 666        if (ev >= 12)
 667                *p += 4; /* skip stripe_width */
 668
 669        if (ev >= 13) {
 670                *p += 8; /* skip target_max_bytes */
 671                *p += 8; /* skip target_max_objects */
 672                *p += 4; /* skip cache_target_dirty_ratio_micro */
 673                *p += 4; /* skip cache_target_full_ratio_micro */
 674                *p += 4; /* skip cache_min_flush_age */
 675                *p += 4; /* skip cache_min_evict_age */
 676        }
 677
 678        if (ev >=  14) {
 679                /* skip erasure_code_profile */
 680                len = ceph_decode_32(p);
 681                *p += len;
 682        }
 683
 684        if (ev >= 15)
 685                pi->last_force_request_resend = ceph_decode_32(p);
 686        else
 687                pi->last_force_request_resend = 0;
 688
 689        /* ignore the rest */
 690
 691        *p = pool_end;
 692        calc_pg_masks(pi);
 693        return 0;
 694
 695bad:
 696        return -EINVAL;
 697}
 698
 699static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 700{
 701        struct ceph_pg_pool_info *pi;
 702        u32 num, len;
 703        u64 pool;
 704
 705        ceph_decode_32_safe(p, end, num, bad);
 706        dout(" %d pool names\n", num);
 707        while (num--) {
 708                ceph_decode_64_safe(p, end, pool, bad);
 709                ceph_decode_32_safe(p, end, len, bad);
 710                dout("  pool %llu len %d\n", pool, len);
 711                ceph_decode_need(p, end, len, bad);
 712                pi = __lookup_pg_pool(&map->pg_pools, pool);
 713                if (pi) {
 714                        char *name = kstrndup(*p, len, GFP_NOFS);
 715
 716                        if (!name)
 717                                return -ENOMEM;
 718                        kfree(pi->name);
 719                        pi->name = name;
 720                        dout("  name is %s\n", pi->name);
 721                }
 722                *p += len;
 723        }
 724        return 0;
 725
 726bad:
 727        return -EINVAL;
 728}
 729
 730/*
 731 * osd map
 732 */
 733struct ceph_osdmap *ceph_osdmap_alloc(void)
 734{
 735        struct ceph_osdmap *map;
 736
 737        map = kzalloc(sizeof(*map), GFP_NOIO);
 738        if (!map)
 739                return NULL;
 740
 741        map->pg_pools = RB_ROOT;
 742        map->pool_max = -1;
 743        map->pg_temp = RB_ROOT;
 744        map->primary_temp = RB_ROOT;
 745        mutex_init(&map->crush_workspace_mutex);
 746
 747        return map;
 748}
 749
 750void ceph_osdmap_destroy(struct ceph_osdmap *map)
 751{
 752        dout("osdmap_destroy %p\n", map);
 753        if (map->crush)
 754                crush_destroy(map->crush);
 755        while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 756                struct ceph_pg_mapping *pg =
 757                        rb_entry(rb_first(&map->pg_temp),
 758                                 struct ceph_pg_mapping, node);
 759                rb_erase(&pg->node, &map->pg_temp);
 760                kfree(pg);
 761        }
 762        while (!RB_EMPTY_ROOT(&map->primary_temp)) {
 763                struct ceph_pg_mapping *pg =
 764                        rb_entry(rb_first(&map->primary_temp),
 765                                 struct ceph_pg_mapping, node);
 766                rb_erase(&pg->node, &map->primary_temp);
 767                kfree(pg);
 768        }
 769        while (!RB_EMPTY_ROOT(&map->pg_pools)) {
 770                struct ceph_pg_pool_info *pi =
 771                        rb_entry(rb_first(&map->pg_pools),
 772                                 struct ceph_pg_pool_info, node);
 773                __remove_pg_pool(&map->pg_pools, pi);
 774        }
 775        kfree(map->osd_state);
 776        kfree(map->osd_weight);
 777        kfree(map->osd_addr);
 778        kfree(map->osd_primary_affinity);
 779        kfree(map->crush_workspace);
 780        kfree(map);
 781}
 782
 783/*
 784 * Adjust max_osd value, (re)allocate arrays.
 785 *
 786 * The new elements are properly initialized.
 787 */
 788static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 789{
 790        u8 *state;
 791        u32 *weight;
 792        struct ceph_entity_addr *addr;
 793        int i;
 794
 795        state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
 796        if (!state)
 797                return -ENOMEM;
 798        map->osd_state = state;
 799
 800        weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
 801        if (!weight)
 802                return -ENOMEM;
 803        map->osd_weight = weight;
 804
 805        addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
 806        if (!addr)
 807                return -ENOMEM;
 808        map->osd_addr = addr;
 809
 810        for (i = map->max_osd; i < max; i++) {
 811                map->osd_state[i] = 0;
 812                map->osd_weight[i] = CEPH_OSD_OUT;
 813                memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
 814        }
 815
 816        if (map->osd_primary_affinity) {
 817                u32 *affinity;
 818
 819                affinity = krealloc(map->osd_primary_affinity,
 820                                    max*sizeof(*affinity), GFP_NOFS);
 821                if (!affinity)
 822                        return -ENOMEM;
 823                map->osd_primary_affinity = affinity;
 824
 825                for (i = map->max_osd; i < max; i++)
 826                        map->osd_primary_affinity[i] =
 827                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
 828        }
 829
 830        map->max_osd = max;
 831
 832        return 0;
 833}
 834
 835static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 836{
 837        void *workspace;
 838        size_t work_size;
 839
 840        if (IS_ERR(crush))
 841                return PTR_ERR(crush);
 842
 843        work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
 844        dout("%s work_size %zu bytes\n", __func__, work_size);
 845        workspace = kmalloc(work_size, GFP_NOIO);
 846        if (!workspace) {
 847                crush_destroy(crush);
 848                return -ENOMEM;
 849        }
 850        crush_init_workspace(crush, workspace);
 851
 852        if (map->crush)
 853                crush_destroy(map->crush);
 854        kfree(map->crush_workspace);
 855        map->crush = crush;
 856        map->crush_workspace = workspace;
 857        return 0;
 858}
 859
 860#define OSDMAP_WRAPPER_COMPAT_VER       7
 861#define OSDMAP_CLIENT_DATA_COMPAT_VER   1
 862
 863/*
 864 * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
 865 * to struct_v of the client_data section for new (v7 and above)
 866 * osdmaps.
 867 */
 868static int get_osdmap_client_data_v(void **p, void *end,
 869                                    const char *prefix, u8 *v)
 870{
 871        u8 struct_v;
 872
 873        ceph_decode_8_safe(p, end, struct_v, e_inval);
 874        if (struct_v >= 7) {
 875                u8 struct_compat;
 876
 877                ceph_decode_8_safe(p, end, struct_compat, e_inval);
 878                if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
 879                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
 880                                struct_v, struct_compat,
 881                                OSDMAP_WRAPPER_COMPAT_VER, prefix);
 882                        return -EINVAL;
 883                }
 884                *p += 4; /* ignore wrapper struct_len */
 885
 886                ceph_decode_8_safe(p, end, struct_v, e_inval);
 887                ceph_decode_8_safe(p, end, struct_compat, e_inval);
 888                if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
 889                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
 890                                struct_v, struct_compat,
 891                                OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
 892                        return -EINVAL;
 893                }
 894                *p += 4; /* ignore client data struct_len */
 895        } else {
 896                u16 version;
 897
 898                *p -= 1;
 899                ceph_decode_16_safe(p, end, version, e_inval);
 900                if (version < 6) {
 901                        pr_warn("got v %d < 6 of %s ceph_osdmap\n",
 902                                version, prefix);
 903                        return -EINVAL;
 904                }
 905
 906                /* old osdmap enconding */
 907                struct_v = 0;
 908        }
 909
 910        *v = struct_v;
 911        return 0;
 912
 913e_inval:
 914        return -EINVAL;
 915}
 916
 917static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
 918                          bool incremental)
 919{
 920        u32 n;
 921
 922        ceph_decode_32_safe(p, end, n, e_inval);
 923        while (n--) {
 924                struct ceph_pg_pool_info *pi;
 925                u64 pool;
 926                int ret;
 927
 928                ceph_decode_64_safe(p, end, pool, e_inval);
 929
 930                pi = __lookup_pg_pool(&map->pg_pools, pool);
 931                if (!incremental || !pi) {
 932                        pi = kzalloc(sizeof(*pi), GFP_NOFS);
 933                        if (!pi)
 934                                return -ENOMEM;
 935
 936                        pi->id = pool;
 937
 938                        ret = __insert_pg_pool(&map->pg_pools, pi);
 939                        if (ret) {
 940                                kfree(pi);
 941                                return ret;
 942                        }
 943                }
 944
 945                ret = decode_pool(p, end, pi);
 946                if (ret)
 947                        return ret;
 948        }
 949
 950        return 0;
 951
 952e_inval:
 953        return -EINVAL;
 954}
 955
 956static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
 957{
 958        return __decode_pools(p, end, map, false);
 959}
 960
 961static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
 962{
 963        return __decode_pools(p, end, map, true);
 964}
 965
 966static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
 967                            bool incremental)
 968{
 969        u32 n;
 970
 971        ceph_decode_32_safe(p, end, n, e_inval);
 972        while (n--) {
 973                struct ceph_pg pgid;
 974                u32 len, i;
 975                int ret;
 976
 977                ret = ceph_decode_pgid(p, end, &pgid);
 978                if (ret)
 979                        return ret;
 980
 981                ceph_decode_32_safe(p, end, len, e_inval);
 982
 983                ret = __remove_pg_mapping(&map->pg_temp, pgid);
 984                BUG_ON(!incremental && ret != -ENOENT);
 985
 986                if (!incremental || len > 0) {
 987                        struct ceph_pg_mapping *pg;
 988
 989                        ceph_decode_need(p, end, len*sizeof(u32), e_inval);
 990
 991                        if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
 992                                return -EINVAL;
 993
 994                        pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
 995                        if (!pg)
 996                                return -ENOMEM;
 997
 998                        pg->pgid = pgid;
 999                        pg->pg_temp.len = len;
1000                        for (i = 0; i < len; i++)
1001                                pg->pg_temp.osds[i] = ceph_decode_32(p);
1002
1003                        ret = __insert_pg_mapping(pg, &map->pg_temp);
1004                        if (ret) {
1005                                kfree(pg);
1006                                return ret;
1007                        }
1008                }
1009        }
1010
1011        return 0;
1012
1013e_inval:
1014        return -EINVAL;
1015}
1016
1017static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1018{
1019        return __decode_pg_temp(p, end, map, false);
1020}
1021
1022static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1023{
1024        return __decode_pg_temp(p, end, map, true);
1025}
1026
1027static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
1028                                 bool incremental)
1029{
1030        u32 n;
1031
1032        ceph_decode_32_safe(p, end, n, e_inval);
1033        while (n--) {
1034                struct ceph_pg pgid;
1035                u32 osd;
1036                int ret;
1037
1038                ret = ceph_decode_pgid(p, end, &pgid);
1039                if (ret)
1040                        return ret;
1041
1042                ceph_decode_32_safe(p, end, osd, e_inval);
1043
1044                ret = __remove_pg_mapping(&map->primary_temp, pgid);
1045                BUG_ON(!incremental && ret != -ENOENT);
1046
1047                if (!incremental || osd != (u32)-1) {
1048                        struct ceph_pg_mapping *pg;
1049
1050                        pg = kzalloc(sizeof(*pg), GFP_NOFS);
1051                        if (!pg)
1052                                return -ENOMEM;
1053
1054                        pg->pgid = pgid;
1055                        pg->primary_temp.osd = osd;
1056
1057                        ret = __insert_pg_mapping(pg, &map->primary_temp);
1058                        if (ret) {
1059                                kfree(pg);
1060                                return ret;
1061                        }
1062                }
1063        }
1064
1065        return 0;
1066
1067e_inval:
1068        return -EINVAL;
1069}
1070
1071static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1072{
1073        return __decode_primary_temp(p, end, map, false);
1074}
1075
1076static int decode_new_primary_temp(void **p, void *end,
1077                                   struct ceph_osdmap *map)
1078{
1079        return __decode_primary_temp(p, end, map, true);
1080}
1081
1082u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1083{
1084        BUG_ON(osd >= map->max_osd);
1085
1086        if (!map->osd_primary_affinity)
1087                return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1088
1089        return map->osd_primary_affinity[osd];
1090}
1091
1092static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1093{
1094        BUG_ON(osd >= map->max_osd);
1095
1096        if (!map->osd_primary_affinity) {
1097                int i;
1098
1099                map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32),
1100                                                    GFP_NOFS);
1101                if (!map->osd_primary_affinity)
1102                        return -ENOMEM;
1103
1104                for (i = 0; i < map->max_osd; i++)
1105                        map->osd_primary_affinity[i] =
1106                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1107        }
1108
1109        map->osd_primary_affinity[osd] = aff;
1110
1111        return 0;
1112}
1113
1114static int decode_primary_affinity(void **p, void *end,
1115                                   struct ceph_osdmap *map)
1116{
1117        u32 len, i;
1118
1119        ceph_decode_32_safe(p, end, len, e_inval);
1120        if (len == 0) {
1121                kfree(map->osd_primary_affinity);
1122                map->osd_primary_affinity = NULL;
1123                return 0;
1124        }
1125        if (len != map->max_osd)
1126                goto e_inval;
1127
1128        ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1129
1130        for (i = 0; i < map->max_osd; i++) {
1131                int ret;
1132
1133                ret = set_primary_affinity(map, i, ceph_decode_32(p));
1134                if (ret)
1135                        return ret;
1136        }
1137
1138        return 0;
1139
1140e_inval:
1141        return -EINVAL;
1142}
1143
1144static int decode_new_primary_affinity(void **p, void *end,
1145                                       struct ceph_osdmap *map)
1146{
1147        u32 n;
1148
1149        ceph_decode_32_safe(p, end, n, e_inval);
1150        while (n--) {
1151                u32 osd, aff;
1152                int ret;
1153
1154                ceph_decode_32_safe(p, end, osd, e_inval);
1155                ceph_decode_32_safe(p, end, aff, e_inval);
1156
1157                ret = set_primary_affinity(map, osd, aff);
1158                if (ret)
1159                        return ret;
1160
1161                pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1162        }
1163
1164        return 0;
1165
1166e_inval:
1167        return -EINVAL;
1168}
1169
1170/*
1171 * decode a full map.
1172 */
1173static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1174{
1175        u8 struct_v;
1176        u32 epoch = 0;
1177        void *start = *p;
1178        u32 max;
1179        u32 len, i;
1180        int err;
1181
1182        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1183
1184        err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1185        if (err)
1186                goto bad;
1187
1188        /* fsid, epoch, created, modified */
1189        ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1190                         sizeof(map->created) + sizeof(map->modified), e_inval);
1191        ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1192        epoch = map->epoch = ceph_decode_32(p);
1193        ceph_decode_copy(p, &map->created, sizeof(map->created));
1194        ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1195
1196        /* pools */
1197        err = decode_pools(p, end, map);
1198        if (err)
1199                goto bad;
1200
1201        /* pool_name */
1202        err = decode_pool_names(p, end, map);
1203        if (err)
1204                goto bad;
1205
1206        ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1207
1208        ceph_decode_32_safe(p, end, map->flags, e_inval);
1209
1210        /* max_osd */
1211        ceph_decode_32_safe(p, end, max, e_inval);
1212
1213        /* (re)alloc osd arrays */
1214        err = osdmap_set_max_osd(map, max);
1215        if (err)
1216                goto bad;
1217
1218        /* osd_state, osd_weight, osd_addrs->client_addr */
1219        ceph_decode_need(p, end, 3*sizeof(u32) +
1220                         map->max_osd*(1 + sizeof(*map->osd_weight) +
1221                                       sizeof(*map->osd_addr)), e_inval);
1222
1223        if (ceph_decode_32(p) != map->max_osd)
1224                goto e_inval;
1225
1226        ceph_decode_copy(p, map->osd_state, map->max_osd);
1227
1228        if (ceph_decode_32(p) != map->max_osd)
1229                goto e_inval;
1230
1231        for (i = 0; i < map->max_osd; i++)
1232                map->osd_weight[i] = ceph_decode_32(p);
1233
1234        if (ceph_decode_32(p) != map->max_osd)
1235                goto e_inval;
1236
1237        ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
1238        for (i = 0; i < map->max_osd; i++)
1239                ceph_decode_addr(&map->osd_addr[i]);
1240
1241        /* pg_temp */
1242        err = decode_pg_temp(p, end, map);
1243        if (err)
1244                goto bad;
1245
1246        /* primary_temp */
1247        if (struct_v >= 1) {
1248                err = decode_primary_temp(p, end, map);
1249                if (err)
1250                        goto bad;
1251        }
1252
1253        /* primary_affinity */
1254        if (struct_v >= 2) {
1255                err = decode_primary_affinity(p, end, map);
1256                if (err)
1257                        goto bad;
1258        } else {
1259                /* XXX can this happen? */
1260                kfree(map->osd_primary_affinity);
1261                map->osd_primary_affinity = NULL;
1262        }
1263
1264        /* crush */
1265        ceph_decode_32_safe(p, end, len, e_inval);
1266        err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1267        if (err)
1268                goto bad;
1269
1270        /* ignore the rest */
1271        *p = end;
1272
1273        dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1274        return 0;
1275
1276e_inval:
1277        err = -EINVAL;
1278bad:
1279        pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1280               err, epoch, (int)(*p - start), *p, start, end);
1281        print_hex_dump(KERN_DEBUG, "osdmap: ",
1282                       DUMP_PREFIX_OFFSET, 16, 1,
1283                       start, end - start, true);
1284        return err;
1285}
1286
1287/*
1288 * Allocate and decode a full map.
1289 */
1290struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1291{
1292        struct ceph_osdmap *map;
1293        int ret;
1294
1295        map = ceph_osdmap_alloc();
1296        if (!map)
1297                return ERR_PTR(-ENOMEM);
1298
1299        ret = osdmap_decode(p, end, map);
1300        if (ret) {
1301                ceph_osdmap_destroy(map);
1302                return ERR_PTR(ret);
1303        }
1304
1305        return map;
1306}
1307
1308/*
1309 * Encoding order is (new_up_client, new_state, new_weight).  Need to
1310 * apply in the (new_weight, new_state, new_up_client) order, because
1311 * an incremental map may look like e.g.
1312 *
1313 *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1314 *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1315 */
1316static int decode_new_up_state_weight(void **p, void *end,
1317                                      struct ceph_osdmap *map)
1318{
1319        void *new_up_client;
1320        void *new_state;
1321        void *new_weight_end;
1322        u32 len;
1323
1324        new_up_client = *p;
1325        ceph_decode_32_safe(p, end, len, e_inval);
1326        len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
1327        ceph_decode_need(p, end, len, e_inval);
1328        *p += len;
1329
1330        new_state = *p;
1331        ceph_decode_32_safe(p, end, len, e_inval);
1332        len *= sizeof(u32) + sizeof(u8);
1333        ceph_decode_need(p, end, len, e_inval);
1334        *p += len;
1335
1336        /* new_weight */
1337        ceph_decode_32_safe(p, end, len, e_inval);
1338        while (len--) {
1339                s32 osd;
1340                u32 w;
1341
1342                ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1343                osd = ceph_decode_32(p);
1344                w = ceph_decode_32(p);
1345                BUG_ON(osd >= map->max_osd);
1346                pr_info("osd%d weight 0x%x %s\n", osd, w,
1347                     w == CEPH_OSD_IN ? "(in)" :
1348                     (w == CEPH_OSD_OUT ? "(out)" : ""));
1349                map->osd_weight[osd] = w;
1350
1351                /*
1352                 * If we are marking in, set the EXISTS, and clear the
1353                 * AUTOOUT and NEW bits.
1354                 */
1355                if (w) {
1356                        map->osd_state[osd] |= CEPH_OSD_EXISTS;
1357                        map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1358                                                 CEPH_OSD_NEW);
1359                }
1360        }
1361        new_weight_end = *p;
1362
1363        /* new_state (up/down) */
1364        *p = new_state;
1365        len = ceph_decode_32(p);
1366        while (len--) {
1367                s32 osd;
1368                u8 xorstate;
1369                int ret;
1370
1371                osd = ceph_decode_32(p);
1372                xorstate = ceph_decode_8(p);
1373                if (xorstate == 0)
1374                        xorstate = CEPH_OSD_UP;
1375                BUG_ON(osd >= map->max_osd);
1376                if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1377                    (xorstate & CEPH_OSD_UP))
1378                        pr_info("osd%d down\n", osd);
1379                if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1380                    (xorstate & CEPH_OSD_EXISTS)) {
1381                        pr_info("osd%d does not exist\n", osd);
1382                        ret = set_primary_affinity(map, osd,
1383                                                   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1384                        if (ret)
1385                                return ret;
1386                        memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1387                        map->osd_state[osd] = 0;
1388                } else {
1389                        map->osd_state[osd] ^= xorstate;
1390                }
1391        }
1392
1393        /* new_up_client */
1394        *p = new_up_client;
1395        len = ceph_decode_32(p);
1396        while (len--) {
1397                s32 osd;
1398                struct ceph_entity_addr addr;
1399
1400                osd = ceph_decode_32(p);
1401                ceph_decode_copy(p, &addr, sizeof(addr));
1402                ceph_decode_addr(&addr);
1403                BUG_ON(osd >= map->max_osd);
1404                pr_info("osd%d up\n", osd);
1405                map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1406                map->osd_addr[osd] = addr;
1407        }
1408
1409        *p = new_weight_end;
1410        return 0;
1411
1412e_inval:
1413        return -EINVAL;
1414}
1415
1416/*
1417 * decode and apply an incremental map update.
1418 */
1419struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1420                                             struct ceph_osdmap *map)
1421{
1422        struct ceph_fsid fsid;
1423        u32 epoch = 0;
1424        struct ceph_timespec modified;
1425        s32 len;
1426        u64 pool;
1427        __s64 new_pool_max;
1428        __s32 new_flags, max;
1429        void *start = *p;
1430        int err;
1431        u8 struct_v;
1432
1433        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1434
1435        err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1436        if (err)
1437                goto bad;
1438
1439        /* fsid, epoch, modified, new_pool_max, new_flags */
1440        ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1441                         sizeof(u64) + sizeof(u32), e_inval);
1442        ceph_decode_copy(p, &fsid, sizeof(fsid));
1443        epoch = ceph_decode_32(p);
1444        BUG_ON(epoch != map->epoch+1);
1445        ceph_decode_copy(p, &modified, sizeof(modified));
1446        new_pool_max = ceph_decode_64(p);
1447        new_flags = ceph_decode_32(p);
1448
1449        /* full map? */
1450        ceph_decode_32_safe(p, end, len, e_inval);
1451        if (len > 0) {
1452                dout("apply_incremental full map len %d, %p to %p\n",
1453                     len, *p, end);
1454                return ceph_osdmap_decode(p, min(*p+len, end));
1455        }
1456
1457        /* new crush? */
1458        ceph_decode_32_safe(p, end, len, e_inval);
1459        if (len > 0) {
1460                err = osdmap_set_crush(map,
1461                                       crush_decode(*p, min(*p + len, end)));
1462                if (err)
1463                        goto bad;
1464                *p += len;
1465        }
1466
1467        /* new flags? */
1468        if (new_flags >= 0)
1469                map->flags = new_flags;
1470        if (new_pool_max >= 0)
1471                map->pool_max = new_pool_max;
1472
1473        /* new max? */
1474        ceph_decode_32_safe(p, end, max, e_inval);
1475        if (max >= 0) {
1476                err = osdmap_set_max_osd(map, max);
1477                if (err)
1478                        goto bad;
1479        }
1480
1481        map->epoch++;
1482        map->modified = modified;
1483
1484        /* new_pools */
1485        err = decode_new_pools(p, end, map);
1486        if (err)
1487                goto bad;
1488
1489        /* new_pool_names */
1490        err = decode_pool_names(p, end, map);
1491        if (err)
1492                goto bad;
1493
1494        /* old_pool */
1495        ceph_decode_32_safe(p, end, len, e_inval);
1496        while (len--) {
1497                struct ceph_pg_pool_info *pi;
1498
1499                ceph_decode_64_safe(p, end, pool, e_inval);
1500                pi = __lookup_pg_pool(&map->pg_pools, pool);
1501                if (pi)
1502                        __remove_pg_pool(&map->pg_pools, pi);
1503        }
1504
1505        /* new_up_client, new_state, new_weight */
1506        err = decode_new_up_state_weight(p, end, map);
1507        if (err)
1508                goto bad;
1509
1510        /* new_pg_temp */
1511        err = decode_new_pg_temp(p, end, map);
1512        if (err)
1513                goto bad;
1514
1515        /* new_primary_temp */
1516        if (struct_v >= 1) {
1517                err = decode_new_primary_temp(p, end, map);
1518                if (err)
1519                        goto bad;
1520        }
1521
1522        /* new_primary_affinity */
1523        if (struct_v >= 2) {
1524                err = decode_new_primary_affinity(p, end, map);
1525                if (err)
1526                        goto bad;
1527        }
1528
1529        /* ignore the rest */
1530        *p = end;
1531
1532        dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1533        return map;
1534
1535e_inval:
1536        err = -EINVAL;
1537bad:
1538        pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1539               err, epoch, (int)(*p - start), *p, start, end);
1540        print_hex_dump(KERN_DEBUG, "osdmap: ",
1541                       DUMP_PREFIX_OFFSET, 16, 1,
1542                       start, end - start, true);
1543        return ERR_PTR(err);
1544}
1545
1546void ceph_oloc_copy(struct ceph_object_locator *dest,
1547                    const struct ceph_object_locator *src)
1548{
1549        WARN_ON(!ceph_oloc_empty(dest));
1550        WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
1551
1552        dest->pool = src->pool;
1553        if (src->pool_ns)
1554                dest->pool_ns = ceph_get_string(src->pool_ns);
1555}
1556EXPORT_SYMBOL(ceph_oloc_copy);
1557
1558void ceph_oloc_destroy(struct ceph_object_locator *oloc)
1559{
1560        ceph_put_string(oloc->pool_ns);
1561}
1562EXPORT_SYMBOL(ceph_oloc_destroy);
1563
1564void ceph_oid_copy(struct ceph_object_id *dest,
1565                   const struct ceph_object_id *src)
1566{
1567        WARN_ON(!ceph_oid_empty(dest));
1568
1569        if (src->name != src->inline_name) {
1570                /* very rare, see ceph_object_id definition */
1571                dest->name = kmalloc(src->name_len + 1,
1572                                     GFP_NOIO | __GFP_NOFAIL);
1573        }
1574
1575        memcpy(dest->name, src->name, src->name_len + 1);
1576        dest->name_len = src->name_len;
1577}
1578EXPORT_SYMBOL(ceph_oid_copy);
1579
1580static __printf(2, 0)
1581int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
1582{
1583        int len;
1584
1585        WARN_ON(!ceph_oid_empty(oid));
1586
1587        len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
1588        if (len >= sizeof(oid->inline_name))
1589                return len;
1590
1591        oid->name_len = len;
1592        return 0;
1593}
1594
1595/*
1596 * If oid doesn't fit into inline buffer, BUG.
1597 */
1598void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
1599{
1600        va_list ap;
1601
1602        va_start(ap, fmt);
1603        BUG_ON(oid_printf_vargs(oid, fmt, ap));
1604        va_end(ap);
1605}
1606EXPORT_SYMBOL(ceph_oid_printf);
1607
1608static __printf(3, 0)
1609int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
1610                      const char *fmt, va_list ap)
1611{
1612        va_list aq;
1613        int len;
1614
1615        va_copy(aq, ap);
1616        len = oid_printf_vargs(oid, fmt, aq);
1617        va_end(aq);
1618
1619        if (len) {
1620                char *external_name;
1621
1622                external_name = kmalloc(len + 1, gfp);
1623                if (!external_name)
1624                        return -ENOMEM;
1625
1626                oid->name = external_name;
1627                WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
1628                oid->name_len = len;
1629        }
1630
1631        return 0;
1632}
1633
1634/*
1635 * If oid doesn't fit into inline buffer, allocate.
1636 */
1637int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
1638                     const char *fmt, ...)
1639{
1640        va_list ap;
1641        int ret;
1642
1643        va_start(ap, fmt);
1644        ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
1645        va_end(ap);
1646
1647        return ret;
1648}
1649EXPORT_SYMBOL(ceph_oid_aprintf);
1650
1651void ceph_oid_destroy(struct ceph_object_id *oid)
1652{
1653        if (oid->name != oid->inline_name)
1654                kfree(oid->name);
1655}
1656EXPORT_SYMBOL(ceph_oid_destroy);
1657
1658/*
1659 * osds only
1660 */
1661static bool __osds_equal(const struct ceph_osds *lhs,
1662                         const struct ceph_osds *rhs)
1663{
1664        if (lhs->size == rhs->size &&
1665            !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
1666                return true;
1667
1668        return false;
1669}
1670
1671/*
1672 * osds + primary
1673 */
1674static bool osds_equal(const struct ceph_osds *lhs,
1675                       const struct ceph_osds *rhs)
1676{
1677        if (__osds_equal(lhs, rhs) &&
1678            lhs->primary == rhs->primary)
1679                return true;
1680
1681        return false;
1682}
1683
1684static bool osds_valid(const struct ceph_osds *set)
1685{
1686        /* non-empty set */
1687        if (set->size > 0 && set->primary >= 0)
1688                return true;
1689
1690        /* empty can_shift_osds set */
1691        if (!set->size && set->primary == -1)
1692                return true;
1693
1694        /* empty !can_shift_osds set - all NONE */
1695        if (set->size > 0 && set->primary == -1) {
1696                int i;
1697
1698                for (i = 0; i < set->size; i++) {
1699                        if (set->osds[i] != CRUSH_ITEM_NONE)
1700                                break;
1701                }
1702                if (i == set->size)
1703                        return true;
1704        }
1705
1706        return false;
1707}
1708
1709void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
1710{
1711        memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
1712        dest->size = src->size;
1713        dest->primary = src->primary;
1714}
1715
1716static bool is_split(const struct ceph_pg *pgid,
1717                     u32 old_pg_num,
1718                     u32 new_pg_num)
1719{
1720        int old_bits = calc_bits_of(old_pg_num);
1721        int old_mask = (1 << old_bits) - 1;
1722        int n;
1723
1724        WARN_ON(pgid->seed >= old_pg_num);
1725        if (new_pg_num <= old_pg_num)
1726                return false;
1727
1728        for (n = 1; ; n++) {
1729                int next_bit = n << (old_bits - 1);
1730                u32 s = next_bit | pgid->seed;
1731
1732                if (s < old_pg_num || s == pgid->seed)
1733                        continue;
1734                if (s >= new_pg_num)
1735                        break;
1736
1737                s = ceph_stable_mod(s, old_pg_num, old_mask);
1738                if (s == pgid->seed)
1739                        return true;
1740        }
1741
1742        return false;
1743}
1744
1745bool ceph_is_new_interval(const struct ceph_osds *old_acting,
1746                          const struct ceph_osds *new_acting,
1747                          const struct ceph_osds *old_up,
1748                          const struct ceph_osds *new_up,
1749                          int old_size,
1750                          int new_size,
1751                          int old_min_size,
1752                          int new_min_size,
1753                          u32 old_pg_num,
1754                          u32 new_pg_num,
1755                          bool old_sort_bitwise,
1756                          bool new_sort_bitwise,
1757                          const struct ceph_pg *pgid)
1758{
1759        return !osds_equal(old_acting, new_acting) ||
1760               !osds_equal(old_up, new_up) ||
1761               old_size != new_size ||
1762               old_min_size != new_min_size ||
1763               is_split(pgid, old_pg_num, new_pg_num) ||
1764               old_sort_bitwise != new_sort_bitwise;
1765}
1766
1767static int calc_pg_rank(int osd, const struct ceph_osds *acting)
1768{
1769        int i;
1770
1771        for (i = 0; i < acting->size; i++) {
1772                if (acting->osds[i] == osd)
1773                        return i;
1774        }
1775
1776        return -1;
1777}
1778
1779static bool primary_changed(const struct ceph_osds *old_acting,
1780                            const struct ceph_osds *new_acting)
1781{
1782        if (!old_acting->size && !new_acting->size)
1783                return false; /* both still empty */
1784
1785        if (!old_acting->size ^ !new_acting->size)
1786                return true; /* was empty, now not, or vice versa */
1787
1788        if (old_acting->primary != new_acting->primary)
1789                return true; /* primary changed */
1790
1791        if (calc_pg_rank(old_acting->primary, old_acting) !=
1792            calc_pg_rank(new_acting->primary, new_acting))
1793                return true;
1794
1795        return false; /* same primary (tho replicas may have changed) */
1796}
1797
1798bool ceph_osds_changed(const struct ceph_osds *old_acting,
1799                       const struct ceph_osds *new_acting,
1800                       bool any_change)
1801{
1802        if (primary_changed(old_acting, new_acting))
1803                return true;
1804
1805        if (any_change && !__osds_equal(old_acting, new_acting))
1806                return true;
1807
1808        return false;
1809}
1810
1811/*
1812 * calculate file layout from given offset, length.
1813 * fill in correct oid, logical length, and object extent
1814 * offset, length.
1815 *
1816 * for now, we write only a single su, until we can
1817 * pass a stride back to the caller.
1818 */
1819int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1820                                   u64 off, u64 len,
1821                                   u64 *ono,
1822                                   u64 *oxoff, u64 *oxlen)
1823{
1824        u32 osize = layout->object_size;
1825        u32 su = layout->stripe_unit;
1826        u32 sc = layout->stripe_count;
1827        u32 bl, stripeno, stripepos, objsetno;
1828        u32 su_per_object;
1829        u64 t, su_offset;
1830
1831        dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
1832             osize, su);
1833        if (su == 0 || sc == 0)
1834                goto invalid;
1835        su_per_object = osize / su;
1836        if (su_per_object == 0)
1837                goto invalid;
1838        dout("osize %u / su %u = su_per_object %u\n", osize, su,
1839             su_per_object);
1840
1841        if ((su & ~PAGE_MASK) != 0)
1842                goto invalid;
1843
1844        /* bl = *off / su; */
1845        t = off;
1846        do_div(t, su);
1847        bl = t;
1848        dout("off %llu / su %u = bl %u\n", off, su, bl);
1849
1850        stripeno = bl / sc;
1851        stripepos = bl % sc;
1852        objsetno = stripeno / su_per_object;
1853
1854        *ono = objsetno * sc + stripepos;
1855        dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono);
1856
1857        /* *oxoff = *off % layout->fl_stripe_unit;  # offset in su */
1858        t = off;
1859        su_offset = do_div(t, su);
1860        *oxoff = su_offset + (stripeno % su_per_object) * su;
1861
1862        /*
1863         * Calculate the length of the extent being written to the selected
1864         * object. This is the minimum of the full length requested (len) or
1865         * the remainder of the current stripe being written to.
1866         */
1867        *oxlen = min_t(u64, len, su - su_offset);
1868
1869        dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
1870        return 0;
1871
1872invalid:
1873        dout(" invalid layout\n");
1874        *ono = 0;
1875        *oxoff = 0;
1876        *oxlen = 0;
1877        return -EINVAL;
1878}
1879EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1880
1881/*
1882 * Map an object into a PG.
1883 *
1884 * Should only be called with target_oid and target_oloc (as opposed to
1885 * base_oid and base_oloc), since tiering isn't taken into account.
1886 */
1887int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
1888                              struct ceph_object_id *oid,
1889                              struct ceph_object_locator *oloc,
1890                              struct ceph_pg *raw_pgid)
1891{
1892        struct ceph_pg_pool_info *pi;
1893
1894        pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
1895        if (!pi)
1896                return -ENOENT;
1897
1898        if (!oloc->pool_ns) {
1899                raw_pgid->pool = oloc->pool;
1900                raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
1901                                             oid->name_len);
1902                dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
1903                     raw_pgid->pool, raw_pgid->seed);
1904        } else {
1905                char stack_buf[256];
1906                char *buf = stack_buf;
1907                int nsl = oloc->pool_ns->len;
1908                size_t total = nsl + 1 + oid->name_len;
1909
1910                if (total > sizeof(stack_buf)) {
1911                        buf = kmalloc(total, GFP_NOIO);
1912                        if (!buf)
1913                                return -ENOMEM;
1914                }
1915                memcpy(buf, oloc->pool_ns->str, nsl);
1916                buf[nsl] = '\037';
1917                memcpy(buf + nsl + 1, oid->name, oid->name_len);
1918                raw_pgid->pool = oloc->pool;
1919                raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
1920                if (buf != stack_buf)
1921                        kfree(buf);
1922                dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
1923                     oid->name, nsl, oloc->pool_ns->str,
1924                     raw_pgid->pool, raw_pgid->seed);
1925        }
1926        return 0;
1927}
1928EXPORT_SYMBOL(ceph_object_locator_to_pg);
1929
1930/*
1931 * Map a raw PG (full precision ps) into an actual PG.
1932 */
1933static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
1934                         const struct ceph_pg *raw_pgid,
1935                         struct ceph_pg *pgid)
1936{
1937        pgid->pool = raw_pgid->pool;
1938        pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
1939                                     pi->pg_num_mask);
1940}
1941
1942/*
1943 * Map a raw PG (full precision ps) into a placement ps (placement
1944 * seed).  Include pool id in that value so that different pools don't
1945 * use the same seeds.
1946 */
1947static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
1948                         const struct ceph_pg *raw_pgid)
1949{
1950        if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1951                /* hash pool id and seed so that pool PGs do not overlap */
1952                return crush_hash32_2(CRUSH_HASH_RJENKINS1,
1953                                      ceph_stable_mod(raw_pgid->seed,
1954                                                      pi->pgp_num,
1955                                                      pi->pgp_num_mask),
1956                                      raw_pgid->pool);
1957        } else {
1958                /*
1959                 * legacy behavior: add ps and pool together.  this is
1960                 * not a great approach because the PGs from each pool
1961                 * will overlap on top of each other: 0.5 == 1.4 ==
1962                 * 2.3 == ...
1963                 */
1964                return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
1965                                       pi->pgp_num_mask) +
1966                       (unsigned)raw_pgid->pool;
1967        }
1968}
1969
1970static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1971                    int *result, int result_max,
1972                    const __u32 *weight, int weight_max)
1973{
1974        int r;
1975
1976        BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1977
1978        mutex_lock(&map->crush_workspace_mutex);
1979        r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1980                          weight, weight_max, map->crush_workspace);
1981        mutex_unlock(&map->crush_workspace_mutex);
1982
1983        return r;
1984}
1985
1986/*
1987 * Calculate raw set (CRUSH output) for given PG.  The result may
1988 * contain nonexistent OSDs.  ->primary is undefined for a raw set.
1989 *
1990 * Placement seed (CRUSH input) is returned through @ppps.
1991 */
1992static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
1993                           struct ceph_pg_pool_info *pi,
1994                           const struct ceph_pg *raw_pgid,
1995                           struct ceph_osds *raw,
1996                           u32 *ppps)
1997{
1998        u32 pps = raw_pg_to_pps(pi, raw_pgid);
1999        int ruleno;
2000        int len;
2001
2002        ceph_osds_init(raw);
2003        if (ppps)
2004                *ppps = pps;
2005
2006        ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2007                                 pi->size);
2008        if (ruleno < 0) {
2009                pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2010                       pi->id, pi->crush_ruleset, pi->type, pi->size);
2011                return;
2012        }
2013
2014        if (pi->size > ARRAY_SIZE(raw->osds)) {
2015                pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2016                       pi->id, pi->crush_ruleset, pi->type, pi->size,
2017                       ARRAY_SIZE(raw->osds));
2018                return;
2019        }
2020
2021        len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2022                       osdmap->osd_weight, osdmap->max_osd);
2023        if (len < 0) {
2024                pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2025                       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2026                       pi->size);
2027                return;
2028        }
2029
2030        raw->size = len;
2031}
2032
2033/*
2034 * Given raw set, calculate up set and up primary.  By definition of an
2035 * up set, the result won't contain nonexistent or down OSDs.
2036 *
2037 * This is done in-place - on return @set is the up set.  If it's
2038 * empty, ->primary will remain undefined.
2039 */
2040static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2041                           struct ceph_pg_pool_info *pi,
2042                           struct ceph_osds *set)
2043{
2044        int i;
2045
2046        /* ->primary is undefined for a raw set */
2047        BUG_ON(set->primary != -1);
2048
2049        if (ceph_can_shift_osds(pi)) {
2050                int removed = 0;
2051
2052                /* shift left */
2053                for (i = 0; i < set->size; i++) {
2054                        if (ceph_osd_is_down(osdmap, set->osds[i])) {
2055                                removed++;
2056                                continue;
2057                        }
2058                        if (removed)
2059                                set->osds[i - removed] = set->osds[i];
2060                }
2061                set->size -= removed;
2062                if (set->size > 0)
2063                        set->primary = set->osds[0];
2064        } else {
2065                /* set down/dne devices to NONE */
2066                for (i = set->size - 1; i >= 0; i--) {
2067                        if (ceph_osd_is_down(osdmap, set->osds[i]))
2068                                set->osds[i] = CRUSH_ITEM_NONE;
2069                        else
2070                                set->primary = set->osds[i];
2071                }
2072        }
2073}
2074
2075static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2076                                   struct ceph_pg_pool_info *pi,
2077                                   u32 pps,
2078                                   struct ceph_osds *up)
2079{
2080        int i;
2081        int pos = -1;
2082
2083        /*
2084         * Do we have any non-default primary_affinity values for these
2085         * osds?
2086         */
2087        if (!osdmap->osd_primary_affinity)
2088                return;
2089
2090        for (i = 0; i < up->size; i++) {
2091                int osd = up->osds[i];
2092
2093                if (osd != CRUSH_ITEM_NONE &&
2094                    osdmap->osd_primary_affinity[osd] !=
2095                                        CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2096                        break;
2097                }
2098        }
2099        if (i == up->size)
2100                return;
2101
2102        /*
2103         * Pick the primary.  Feed both the seed (for the pg) and the
2104         * osd into the hash/rng so that a proportional fraction of an
2105         * osd's pgs get rejected as primary.
2106         */
2107        for (i = 0; i < up->size; i++) {
2108                int osd = up->osds[i];
2109                u32 aff;
2110
2111                if (osd == CRUSH_ITEM_NONE)
2112                        continue;
2113
2114                aff = osdmap->osd_primary_affinity[osd];
2115                if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2116                    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2117                                    pps, osd) >> 16) >= aff) {
2118                        /*
2119                         * We chose not to use this primary.  Note it
2120                         * anyway as a fallback in case we don't pick
2121                         * anyone else, but keep looking.
2122                         */
2123                        if (pos < 0)
2124                                pos = i;
2125                } else {
2126                        pos = i;
2127                        break;
2128                }
2129        }
2130        if (pos < 0)
2131                return;
2132
2133        up->primary = up->osds[pos];
2134
2135        if (ceph_can_shift_osds(pi) && pos > 0) {
2136                /* move the new primary to the front */
2137                for (i = pos; i > 0; i--)
2138                        up->osds[i] = up->osds[i - 1];
2139                up->osds[0] = up->primary;
2140        }
2141}
2142
2143/*
2144 * Get pg_temp and primary_temp mappings for given PG.
2145 *
2146 * Note that a PG may have none, only pg_temp, only primary_temp or
2147 * both pg_temp and primary_temp mappings.  This means @temp isn't
2148 * always a valid OSD set on return: in the "only primary_temp" case,
2149 * @temp will have its ->primary >= 0 but ->size == 0.
2150 */
2151static void get_temp_osds(struct ceph_osdmap *osdmap,
2152                          struct ceph_pg_pool_info *pi,
2153                          const struct ceph_pg *raw_pgid,
2154                          struct ceph_osds *temp)
2155{
2156        struct ceph_pg pgid;
2157        struct ceph_pg_mapping *pg;
2158        int i;
2159
2160        raw_pg_to_pg(pi, raw_pgid, &pgid);
2161        ceph_osds_init(temp);
2162
2163        /* pg_temp? */
2164        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
2165        if (pg) {
2166                for (i = 0; i < pg->pg_temp.len; i++) {
2167                        if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2168                                if (ceph_can_shift_osds(pi))
2169                                        continue;
2170
2171                                temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2172                        } else {
2173                                temp->osds[temp->size++] = pg->pg_temp.osds[i];
2174                        }
2175                }
2176
2177                /* apply pg_temp's primary */
2178                for (i = 0; i < temp->size; i++) {
2179                        if (temp->osds[i] != CRUSH_ITEM_NONE) {
2180                                temp->primary = temp->osds[i];
2181                                break;
2182                        }
2183                }
2184        }
2185
2186        /* primary_temp? */
2187        pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
2188        if (pg)
2189                temp->primary = pg->primary_temp.osd;
2190}
2191
2192/*
2193 * Map a PG to its acting set as well as its up set.
2194 *
2195 * Acting set is used for data mapping purposes, while up set can be
2196 * recorded for detecting interval changes and deciding whether to
2197 * resend a request.
2198 */
2199void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2200                               const struct ceph_pg *raw_pgid,
2201                               struct ceph_osds *up,
2202                               struct ceph_osds *acting)
2203{
2204        struct ceph_pg_pool_info *pi;
2205        u32 pps;
2206
2207        pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2208        if (!pi) {
2209                ceph_osds_init(up);
2210                ceph_osds_init(acting);
2211                goto out;
2212        }
2213
2214        pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2215        raw_to_up_osds(osdmap, pi, up);
2216        apply_primary_affinity(osdmap, pi, pps, up);
2217        get_temp_osds(osdmap, pi, raw_pgid, acting);
2218        if (!acting->size) {
2219                memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2220                acting->size = up->size;
2221                if (acting->primary == -1)
2222                        acting->primary = up->primary;
2223        }
2224out:
2225        WARN_ON(!osds_valid(up) || !osds_valid(acting));
2226}
2227
2228/*
2229 * Return acting primary for given PG, or -1 if none.
2230 */
2231int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2232                              const struct ceph_pg *raw_pgid)
2233{
2234        struct ceph_osds up, acting;
2235
2236        ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting);
2237        return acting.primary;
2238}
2239EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2240