linux/net/ceph/osdmap.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2
   3#include <linux/ceph/ceph_debug.h>
   4
   5#include <linux/module.h>
   6#include <linux/slab.h>
   7
   8#include <linux/ceph/libceph.h>
   9#include <linux/ceph/osdmap.h>
  10#include <linux/ceph/decode.h>
  11#include <linux/crush/hash.h>
  12#include <linux/crush/mapper.h>
  13
  14char *ceph_osdmap_state_str(char *str, int len, u32 state)
  15{
  16        if (!len)
  17                return str;
  18
  19        if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
  20                snprintf(str, len, "exists, up");
  21        else if (state & CEPH_OSD_EXISTS)
  22                snprintf(str, len, "exists");
  23        else if (state & CEPH_OSD_UP)
  24                snprintf(str, len, "up");
  25        else
  26                snprintf(str, len, "doesn't exist");
  27
  28        return str;
  29}
  30
  31/* maps */
  32
  33static int calc_bits_of(unsigned int t)
  34{
  35        int b = 0;
  36        while (t) {
  37                t = t >> 1;
  38                b++;
  39        }
  40        return b;
  41}
  42
  43/*
  44 * the foo_mask is the smallest value 2^n-1 that is >= foo.
  45 */
  46static void calc_pg_masks(struct ceph_pg_pool_info *pi)
  47{
  48        pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
  49        pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
  50}
  51
  52/*
  53 * decode crush map
  54 */
  55static int crush_decode_uniform_bucket(void **p, void *end,
  56                                       struct crush_bucket_uniform *b)
  57{
  58        dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
  59        ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
  60        b->item_weight = ceph_decode_32(p);
  61        return 0;
  62bad:
  63        return -EINVAL;
  64}
  65
  66static int crush_decode_list_bucket(void **p, void *end,
  67                                    struct crush_bucket_list *b)
  68{
  69        int j;
  70        dout("crush_decode_list_bucket %p to %p\n", *p, end);
  71        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  72        if (b->item_weights == NULL)
  73                return -ENOMEM;
  74        b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
  75        if (b->sum_weights == NULL)
  76                return -ENOMEM;
  77        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
  78        for (j = 0; j < b->h.size; j++) {
  79                b->item_weights[j] = ceph_decode_32(p);
  80                b->sum_weights[j] = ceph_decode_32(p);
  81        }
  82        return 0;
  83bad:
  84        return -EINVAL;
  85}
  86
  87static int crush_decode_tree_bucket(void **p, void *end,
  88                                    struct crush_bucket_tree *b)
  89{
  90        int j;
  91        dout("crush_decode_tree_bucket %p to %p\n", *p, end);
  92        ceph_decode_8_safe(p, end, b->num_nodes, bad);
  93        b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
  94        if (b->node_weights == NULL)
  95                return -ENOMEM;
  96        ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
  97        for (j = 0; j < b->num_nodes; j++)
  98                b->node_weights[j] = ceph_decode_32(p);
  99        return 0;
 100bad:
 101        return -EINVAL;
 102}
 103
 104static int crush_decode_straw_bucket(void **p, void *end,
 105                                     struct crush_bucket_straw *b)
 106{
 107        int j;
 108        dout("crush_decode_straw_bucket %p to %p\n", *p, end);
 109        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 110        if (b->item_weights == NULL)
 111                return -ENOMEM;
 112        b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 113        if (b->straws == NULL)
 114                return -ENOMEM;
 115        ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
 116        for (j = 0; j < b->h.size; j++) {
 117                b->item_weights[j] = ceph_decode_32(p);
 118                b->straws[j] = ceph_decode_32(p);
 119        }
 120        return 0;
 121bad:
 122        return -EINVAL;
 123}
 124
 125static int crush_decode_straw2_bucket(void **p, void *end,
 126                                      struct crush_bucket_straw2 *b)
 127{
 128        int j;
 129        dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
 130        b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
 131        if (b->item_weights == NULL)
 132                return -ENOMEM;
 133        ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
 134        for (j = 0; j < b->h.size; j++)
 135                b->item_weights[j] = ceph_decode_32(p);
 136        return 0;
 137bad:
 138        return -EINVAL;
 139}
 140
 141struct crush_name_node {
 142        struct rb_node cn_node;
 143        int cn_id;
 144        char cn_name[];
 145};
 146
 147static struct crush_name_node *alloc_crush_name(size_t name_len)
 148{
 149        struct crush_name_node *cn;
 150
 151        cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
 152        if (!cn)
 153                return NULL;
 154
 155        RB_CLEAR_NODE(&cn->cn_node);
 156        return cn;
 157}
 158
 159static void free_crush_name(struct crush_name_node *cn)
 160{
 161        WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
 162
 163        kfree(cn);
 164}
 165
 166DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
 167
 168static int decode_crush_names(void **p, void *end, struct rb_root *root)
 169{
 170        u32 n;
 171
 172        ceph_decode_32_safe(p, end, n, e_inval);
 173        while (n--) {
 174                struct crush_name_node *cn;
 175                int id;
 176                u32 name_len;
 177
 178                ceph_decode_32_safe(p, end, id, e_inval);
 179                ceph_decode_32_safe(p, end, name_len, e_inval);
 180                ceph_decode_need(p, end, name_len, e_inval);
 181
 182                cn = alloc_crush_name(name_len);
 183                if (!cn)
 184                        return -ENOMEM;
 185
 186                cn->cn_id = id;
 187                memcpy(cn->cn_name, *p, name_len);
 188                cn->cn_name[name_len] = '\0';
 189                *p += name_len;
 190
 191                if (!__insert_crush_name(root, cn)) {
 192                        free_crush_name(cn);
 193                        return -EEXIST;
 194                }
 195        }
 196
 197        return 0;
 198
 199e_inval:
 200        return -EINVAL;
 201}
 202
 203void clear_crush_names(struct rb_root *root)
 204{
 205        while (!RB_EMPTY_ROOT(root)) {
 206                struct crush_name_node *cn =
 207                    rb_entry(rb_first(root), struct crush_name_node, cn_node);
 208
 209                erase_crush_name(root, cn);
 210                free_crush_name(cn);
 211        }
 212}
 213
 214static struct crush_choose_arg_map *alloc_choose_arg_map(void)
 215{
 216        struct crush_choose_arg_map *arg_map;
 217
 218        arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
 219        if (!arg_map)
 220                return NULL;
 221
 222        RB_CLEAR_NODE(&arg_map->node);
 223        return arg_map;
 224}
 225
 226static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
 227{
 228        if (arg_map) {
 229                int i, j;
 230
 231                WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
 232
 233                for (i = 0; i < arg_map->size; i++) {
 234                        struct crush_choose_arg *arg = &arg_map->args[i];
 235
 236                        for (j = 0; j < arg->weight_set_size; j++)
 237                                kfree(arg->weight_set[j].weights);
 238                        kfree(arg->weight_set);
 239                        kfree(arg->ids);
 240                }
 241                kfree(arg_map->args);
 242                kfree(arg_map);
 243        }
 244}
 245
 246DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
 247                node);
 248
 249void clear_choose_args(struct crush_map *c)
 250{
 251        while (!RB_EMPTY_ROOT(&c->choose_args)) {
 252                struct crush_choose_arg_map *arg_map =
 253                    rb_entry(rb_first(&c->choose_args),
 254                             struct crush_choose_arg_map, node);
 255
 256                erase_choose_arg_map(&c->choose_args, arg_map);
 257                free_choose_arg_map(arg_map);
 258        }
 259}
 260
 261static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
 262{
 263        u32 *a = NULL;
 264        u32 len;
 265        int ret;
 266
 267        ceph_decode_32_safe(p, end, len, e_inval);
 268        if (len) {
 269                u32 i;
 270
 271                a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
 272                if (!a) {
 273                        ret = -ENOMEM;
 274                        goto fail;
 275                }
 276
 277                ceph_decode_need(p, end, len * sizeof(u32), e_inval);
 278                for (i = 0; i < len; i++)
 279                        a[i] = ceph_decode_32(p);
 280        }
 281
 282        *plen = len;
 283        return a;
 284
 285e_inval:
 286        ret = -EINVAL;
 287fail:
 288        kfree(a);
 289        return ERR_PTR(ret);
 290}
 291
 292/*
 293 * Assumes @arg is zero-initialized.
 294 */
 295static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
 296{
 297        int ret;
 298
 299        ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
 300        if (arg->weight_set_size) {
 301                u32 i;
 302
 303                arg->weight_set = kmalloc_array(arg->weight_set_size,
 304                                                sizeof(*arg->weight_set),
 305                                                GFP_NOIO);
 306                if (!arg->weight_set)
 307                        return -ENOMEM;
 308
 309                for (i = 0; i < arg->weight_set_size; i++) {
 310                        struct crush_weight_set *w = &arg->weight_set[i];
 311
 312                        w->weights = decode_array_32_alloc(p, end, &w->size);
 313                        if (IS_ERR(w->weights)) {
 314                                ret = PTR_ERR(w->weights);
 315                                w->weights = NULL;
 316                                return ret;
 317                        }
 318                }
 319        }
 320
 321        arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
 322        if (IS_ERR(arg->ids)) {
 323                ret = PTR_ERR(arg->ids);
 324                arg->ids = NULL;
 325                return ret;
 326        }
 327
 328        return 0;
 329
 330e_inval:
 331        return -EINVAL;
 332}
 333
 334static int decode_choose_args(void **p, void *end, struct crush_map *c)
 335{
 336        struct crush_choose_arg_map *arg_map = NULL;
 337        u32 num_choose_arg_maps, num_buckets;
 338        int ret;
 339
 340        ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
 341        while (num_choose_arg_maps--) {
 342                arg_map = alloc_choose_arg_map();
 343                if (!arg_map) {
 344                        ret = -ENOMEM;
 345                        goto fail;
 346                }
 347
 348                ceph_decode_64_safe(p, end, arg_map->choose_args_index,
 349                                    e_inval);
 350                arg_map->size = c->max_buckets;
 351                arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
 352                                        GFP_NOIO);
 353                if (!arg_map->args) {
 354                        ret = -ENOMEM;
 355                        goto fail;
 356                }
 357
 358                ceph_decode_32_safe(p, end, num_buckets, e_inval);
 359                while (num_buckets--) {
 360                        struct crush_choose_arg *arg;
 361                        u32 bucket_index;
 362
 363                        ceph_decode_32_safe(p, end, bucket_index, e_inval);
 364                        if (bucket_index >= arg_map->size)
 365                                goto e_inval;
 366
 367                        arg = &arg_map->args[bucket_index];
 368                        ret = decode_choose_arg(p, end, arg);
 369                        if (ret)
 370                                goto fail;
 371
 372                        if (arg->ids_size &&
 373                            arg->ids_size != c->buckets[bucket_index]->size)
 374                                goto e_inval;
 375                }
 376
 377                insert_choose_arg_map(&c->choose_args, arg_map);
 378        }
 379
 380        return 0;
 381
 382e_inval:
 383        ret = -EINVAL;
 384fail:
 385        free_choose_arg_map(arg_map);
 386        return ret;
 387}
 388
 389static void crush_finalize(struct crush_map *c)
 390{
 391        __s32 b;
 392
 393        /* Space for the array of pointers to per-bucket workspace */
 394        c->working_size = sizeof(struct crush_work) +
 395            c->max_buckets * sizeof(struct crush_work_bucket *);
 396
 397        for (b = 0; b < c->max_buckets; b++) {
 398                if (!c->buckets[b])
 399                        continue;
 400
 401                switch (c->buckets[b]->alg) {
 402                default:
 403                        /*
 404                         * The base case, permutation variables and
 405                         * the pointer to the permutation array.
 406                         */
 407                        c->working_size += sizeof(struct crush_work_bucket);
 408                        break;
 409                }
 410                /* Every bucket has a permutation array. */
 411                c->working_size += c->buckets[b]->size * sizeof(__u32);
 412        }
 413}
 414
 415static struct crush_map *crush_decode(void *pbyval, void *end)
 416{
 417        struct crush_map *c;
 418        int err;
 419        int i, j;
 420        void **p = &pbyval;
 421        void *start = pbyval;
 422        u32 magic;
 423
 424        dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
 425
 426        c = kzalloc(sizeof(*c), GFP_NOFS);
 427        if (c == NULL)
 428                return ERR_PTR(-ENOMEM);
 429
 430        c->type_names = RB_ROOT;
 431        c->names = RB_ROOT;
 432        c->choose_args = RB_ROOT;
 433
 434        /* set tunables to default values */
 435        c->choose_local_tries = 2;
 436        c->choose_local_fallback_tries = 5;
 437        c->choose_total_tries = 19;
 438        c->chooseleaf_descend_once = 0;
 439
 440        ceph_decode_need(p, end, 4*sizeof(u32), bad);
 441        magic = ceph_decode_32(p);
 442        if (magic != CRUSH_MAGIC) {
 443                pr_err("crush_decode magic %x != current %x\n",
 444                       (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
 445                goto bad;
 446        }
 447        c->max_buckets = ceph_decode_32(p);
 448        c->max_rules = ceph_decode_32(p);
 449        c->max_devices = ceph_decode_32(p);
 450
 451        c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
 452        if (c->buckets == NULL)
 453                goto badmem;
 454        c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
 455        if (c->rules == NULL)
 456                goto badmem;
 457
 458        /* buckets */
 459        for (i = 0; i < c->max_buckets; i++) {
 460                int size = 0;
 461                u32 alg;
 462                struct crush_bucket *b;
 463
 464                ceph_decode_32_safe(p, end, alg, bad);
 465                if (alg == 0) {
 466                        c->buckets[i] = NULL;
 467                        continue;
 468                }
 469                dout("crush_decode bucket %d off %x %p to %p\n",
 470                     i, (int)(*p-start), *p, end);
 471
 472                switch (alg) {
 473                case CRUSH_BUCKET_UNIFORM:
 474                        size = sizeof(struct crush_bucket_uniform);
 475                        break;
 476                case CRUSH_BUCKET_LIST:
 477                        size = sizeof(struct crush_bucket_list);
 478                        break;
 479                case CRUSH_BUCKET_TREE:
 480                        size = sizeof(struct crush_bucket_tree);
 481                        break;
 482                case CRUSH_BUCKET_STRAW:
 483                        size = sizeof(struct crush_bucket_straw);
 484                        break;
 485                case CRUSH_BUCKET_STRAW2:
 486                        size = sizeof(struct crush_bucket_straw2);
 487                        break;
 488                default:
 489                        goto bad;
 490                }
 491                BUG_ON(size == 0);
 492                b = c->buckets[i] = kzalloc(size, GFP_NOFS);
 493                if (b == NULL)
 494                        goto badmem;
 495
 496                ceph_decode_need(p, end, 4*sizeof(u32), bad);
 497                b->id = ceph_decode_32(p);
 498                b->type = ceph_decode_16(p);
 499                b->alg = ceph_decode_8(p);
 500                b->hash = ceph_decode_8(p);
 501                b->weight = ceph_decode_32(p);
 502                b->size = ceph_decode_32(p);
 503
 504                dout("crush_decode bucket size %d off %x %p to %p\n",
 505                     b->size, (int)(*p-start), *p, end);
 506
 507                b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
 508                if (b->items == NULL)
 509                        goto badmem;
 510
 511                ceph_decode_need(p, end, b->size*sizeof(u32), bad);
 512                for (j = 0; j < b->size; j++)
 513                        b->items[j] = ceph_decode_32(p);
 514
 515                switch (b->alg) {
 516                case CRUSH_BUCKET_UNIFORM:
 517                        err = crush_decode_uniform_bucket(p, end,
 518                                  (struct crush_bucket_uniform *)b);
 519                        if (err < 0)
 520                                goto fail;
 521                        break;
 522                case CRUSH_BUCKET_LIST:
 523                        err = crush_decode_list_bucket(p, end,
 524                               (struct crush_bucket_list *)b);
 525                        if (err < 0)
 526                                goto fail;
 527                        break;
 528                case CRUSH_BUCKET_TREE:
 529                        err = crush_decode_tree_bucket(p, end,
 530                                (struct crush_bucket_tree *)b);
 531                        if (err < 0)
 532                                goto fail;
 533                        break;
 534                case CRUSH_BUCKET_STRAW:
 535                        err = crush_decode_straw_bucket(p, end,
 536                                (struct crush_bucket_straw *)b);
 537                        if (err < 0)
 538                                goto fail;
 539                        break;
 540                case CRUSH_BUCKET_STRAW2:
 541                        err = crush_decode_straw2_bucket(p, end,
 542                                (struct crush_bucket_straw2 *)b);
 543                        if (err < 0)
 544                                goto fail;
 545                        break;
 546                }
 547        }
 548
 549        /* rules */
 550        dout("rule vec is %p\n", c->rules);
 551        for (i = 0; i < c->max_rules; i++) {
 552                u32 yes;
 553                struct crush_rule *r;
 554
 555                ceph_decode_32_safe(p, end, yes, bad);
 556                if (!yes) {
 557                        dout("crush_decode NO rule %d off %x %p to %p\n",
 558                             i, (int)(*p-start), *p, end);
 559                        c->rules[i] = NULL;
 560                        continue;
 561                }
 562
 563                dout("crush_decode rule %d off %x %p to %p\n",
 564                     i, (int)(*p-start), *p, end);
 565
 566                /* len */
 567                ceph_decode_32_safe(p, end, yes, bad);
 568#if BITS_PER_LONG == 32
 569                if (yes > (ULONG_MAX - sizeof(*r))
 570                          / sizeof(struct crush_rule_step))
 571                        goto bad;
 572#endif
 573                r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
 574                c->rules[i] = r;
 575                if (r == NULL)
 576                        goto badmem;
 577                dout(" rule %d is at %p\n", i, r);
 578                r->len = yes;
 579                ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
 580                ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
 581                for (j = 0; j < r->len; j++) {
 582                        r->steps[j].op = ceph_decode_32(p);
 583                        r->steps[j].arg1 = ceph_decode_32(p);
 584                        r->steps[j].arg2 = ceph_decode_32(p);
 585                }
 586        }
 587
 588        err = decode_crush_names(p, end, &c->type_names);
 589        if (err)
 590                goto fail;
 591
 592        err = decode_crush_names(p, end, &c->names);
 593        if (err)
 594                goto fail;
 595
 596        ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
 597
 598        /* tunables */
 599        ceph_decode_need(p, end, 3*sizeof(u32), done);
 600        c->choose_local_tries = ceph_decode_32(p);
 601        c->choose_local_fallback_tries =  ceph_decode_32(p);
 602        c->choose_total_tries = ceph_decode_32(p);
 603        dout("crush decode tunable choose_local_tries = %d\n",
 604             c->choose_local_tries);
 605        dout("crush decode tunable choose_local_fallback_tries = %d\n",
 606             c->choose_local_fallback_tries);
 607        dout("crush decode tunable choose_total_tries = %d\n",
 608             c->choose_total_tries);
 609
 610        ceph_decode_need(p, end, sizeof(u32), done);
 611        c->chooseleaf_descend_once = ceph_decode_32(p);
 612        dout("crush decode tunable chooseleaf_descend_once = %d\n",
 613             c->chooseleaf_descend_once);
 614
 615        ceph_decode_need(p, end, sizeof(u8), done);
 616        c->chooseleaf_vary_r = ceph_decode_8(p);
 617        dout("crush decode tunable chooseleaf_vary_r = %d\n",
 618             c->chooseleaf_vary_r);
 619
 620        /* skip straw_calc_version, allowed_bucket_algs */
 621        ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
 622        *p += sizeof(u8) + sizeof(u32);
 623
 624        ceph_decode_need(p, end, sizeof(u8), done);
 625        c->chooseleaf_stable = ceph_decode_8(p);
 626        dout("crush decode tunable chooseleaf_stable = %d\n",
 627             c->chooseleaf_stable);
 628
 629        if (*p != end) {
 630                /* class_map */
 631                ceph_decode_skip_map(p, end, 32, 32, bad);
 632                /* class_name */
 633                ceph_decode_skip_map(p, end, 32, string, bad);
 634                /* class_bucket */
 635                ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
 636        }
 637
 638        if (*p != end) {
 639                err = decode_choose_args(p, end, c);
 640                if (err)
 641                        goto fail;
 642        }
 643
 644done:
 645        crush_finalize(c);
 646        dout("crush_decode success\n");
 647        return c;
 648
 649badmem:
 650        err = -ENOMEM;
 651fail:
 652        dout("crush_decode fail %d\n", err);
 653        crush_destroy(c);
 654        return ERR_PTR(err);
 655
 656bad:
 657        err = -EINVAL;
 658        goto fail;
 659}
 660
 661int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
 662{
 663        if (lhs->pool < rhs->pool)
 664                return -1;
 665        if (lhs->pool > rhs->pool)
 666                return 1;
 667        if (lhs->seed < rhs->seed)
 668                return -1;
 669        if (lhs->seed > rhs->seed)
 670                return 1;
 671
 672        return 0;
 673}
 674
 675int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
 676{
 677        int ret;
 678
 679        ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
 680        if (ret)
 681                return ret;
 682
 683        if (lhs->shard < rhs->shard)
 684                return -1;
 685        if (lhs->shard > rhs->shard)
 686                return 1;
 687
 688        return 0;
 689}
 690
 691static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
 692{
 693        struct ceph_pg_mapping *pg;
 694
 695        pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
 696        if (!pg)
 697                return NULL;
 698
 699        RB_CLEAR_NODE(&pg->node);
 700        return pg;
 701}
 702
 703static void free_pg_mapping(struct ceph_pg_mapping *pg)
 704{
 705        WARN_ON(!RB_EMPTY_NODE(&pg->node));
 706
 707        kfree(pg);
 708}
 709
 710/*
 711 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
 712 * to a set of osds) and primary_temp (explicit primary setting)
 713 */
 714DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
 715                 RB_BYPTR, const struct ceph_pg *, node)
 716
 717/*
 718 * rbtree of pg pool info
 719 */
 720DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
 721
 722struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 723{
 724        return lookup_pg_pool(&map->pg_pools, id);
 725}
 726
 727const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
 728{
 729        struct ceph_pg_pool_info *pi;
 730
 731        if (id == CEPH_NOPOOL)
 732                return NULL;
 733
 734        if (WARN_ON_ONCE(id > (u64) INT_MAX))
 735                return NULL;
 736
 737        pi = lookup_pg_pool(&map->pg_pools, id);
 738        return pi ? pi->name : NULL;
 739}
 740EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
 741
 742int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 743{
 744        struct rb_node *rbp;
 745
 746        for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
 747                struct ceph_pg_pool_info *pi =
 748                        rb_entry(rbp, struct ceph_pg_pool_info, node);
 749                if (pi->name && strcmp(pi->name, name) == 0)
 750                        return pi->id;
 751        }
 752        return -ENOENT;
 753}
 754EXPORT_SYMBOL(ceph_pg_poolid_by_name);
 755
 756u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
 757{
 758        struct ceph_pg_pool_info *pi;
 759
 760        pi = lookup_pg_pool(&map->pg_pools, id);
 761        return pi ? pi->flags : 0;
 762}
 763EXPORT_SYMBOL(ceph_pg_pool_flags);
 764
 765static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 766{
 767        erase_pg_pool(root, pi);
 768        kfree(pi->name);
 769        kfree(pi);
 770}
 771
 772static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 773{
 774        u8 ev, cv;
 775        unsigned len, num;
 776        void *pool_end;
 777
 778        ceph_decode_need(p, end, 2 + 4, bad);
 779        ev = ceph_decode_8(p);  /* encoding version */
 780        cv = ceph_decode_8(p); /* compat version */
 781        if (ev < 5) {
 782                pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
 783                return -EINVAL;
 784        }
 785        if (cv > 9) {
 786                pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
 787                return -EINVAL;
 788        }
 789        len = ceph_decode_32(p);
 790        ceph_decode_need(p, end, len, bad);
 791        pool_end = *p + len;
 792
 793        pi->type = ceph_decode_8(p);
 794        pi->size = ceph_decode_8(p);
 795        pi->crush_ruleset = ceph_decode_8(p);
 796        pi->object_hash = ceph_decode_8(p);
 797
 798        pi->pg_num = ceph_decode_32(p);
 799        pi->pgp_num = ceph_decode_32(p);
 800
 801        *p += 4 + 4;  /* skip lpg* */
 802        *p += 4;      /* skip last_change */
 803        *p += 8 + 4;  /* skip snap_seq, snap_epoch */
 804
 805        /* skip snaps */
 806        num = ceph_decode_32(p);
 807        while (num--) {
 808                *p += 8;  /* snapid key */
 809                *p += 1 + 1; /* versions */
 810                len = ceph_decode_32(p);
 811                *p += len;
 812        }
 813
 814        /* skip removed_snaps */
 815        num = ceph_decode_32(p);
 816        *p += num * (8 + 8);
 817
 818        *p += 8;  /* skip auid */
 819        pi->flags = ceph_decode_64(p);
 820        *p += 4;  /* skip crash_replay_interval */
 821
 822        if (ev >= 7)
 823                pi->min_size = ceph_decode_8(p);
 824        else
 825                pi->min_size = pi->size - pi->size / 2;
 826
 827        if (ev >= 8)
 828                *p += 8 + 8;  /* skip quota_max_* */
 829
 830        if (ev >= 9) {
 831                /* skip tiers */
 832                num = ceph_decode_32(p);
 833                *p += num * 8;
 834
 835                *p += 8;  /* skip tier_of */
 836                *p += 1;  /* skip cache_mode */
 837
 838                pi->read_tier = ceph_decode_64(p);
 839                pi->write_tier = ceph_decode_64(p);
 840        } else {
 841                pi->read_tier = -1;
 842                pi->write_tier = -1;
 843        }
 844
 845        if (ev >= 10) {
 846                /* skip properties */
 847                num = ceph_decode_32(p);
 848                while (num--) {
 849                        len = ceph_decode_32(p);
 850                        *p += len; /* key */
 851                        len = ceph_decode_32(p);
 852                        *p += len; /* val */
 853                }
 854        }
 855
 856        if (ev >= 11) {
 857                /* skip hit_set_params */
 858                *p += 1 + 1; /* versions */
 859                len = ceph_decode_32(p);
 860                *p += len;
 861
 862                *p += 4; /* skip hit_set_period */
 863                *p += 4; /* skip hit_set_count */
 864        }
 865
 866        if (ev >= 12)
 867                *p += 4; /* skip stripe_width */
 868
 869        if (ev >= 13) {
 870                *p += 8; /* skip target_max_bytes */
 871                *p += 8; /* skip target_max_objects */
 872                *p += 4; /* skip cache_target_dirty_ratio_micro */
 873                *p += 4; /* skip cache_target_full_ratio_micro */
 874                *p += 4; /* skip cache_min_flush_age */
 875                *p += 4; /* skip cache_min_evict_age */
 876        }
 877
 878        if (ev >=  14) {
 879                /* skip erasure_code_profile */
 880                len = ceph_decode_32(p);
 881                *p += len;
 882        }
 883
 884        /*
 885         * last_force_op_resend_preluminous, will be overridden if the
 886         * map was encoded with RESEND_ON_SPLIT
 887         */
 888        if (ev >= 15)
 889                pi->last_force_request_resend = ceph_decode_32(p);
 890        else
 891                pi->last_force_request_resend = 0;
 892
 893        if (ev >= 16)
 894                *p += 4; /* skip min_read_recency_for_promote */
 895
 896        if (ev >= 17)
 897                *p += 8; /* skip expected_num_objects */
 898
 899        if (ev >= 19)
 900                *p += 4; /* skip cache_target_dirty_high_ratio_micro */
 901
 902        if (ev >= 20)
 903                *p += 4; /* skip min_write_recency_for_promote */
 904
 905        if (ev >= 21)
 906                *p += 1; /* skip use_gmt_hitset */
 907
 908        if (ev >= 22)
 909                *p += 1; /* skip fast_read */
 910
 911        if (ev >= 23) {
 912                *p += 4; /* skip hit_set_grade_decay_rate */
 913                *p += 4; /* skip hit_set_search_last_n */
 914        }
 915
 916        if (ev >= 24) {
 917                /* skip opts */
 918                *p += 1 + 1; /* versions */
 919                len = ceph_decode_32(p);
 920                *p += len;
 921        }
 922
 923        if (ev >= 25)
 924                pi->last_force_request_resend = ceph_decode_32(p);
 925
 926        /* ignore the rest */
 927
 928        *p = pool_end;
 929        calc_pg_masks(pi);
 930        return 0;
 931
 932bad:
 933        return -EINVAL;
 934}
 935
 936static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 937{
 938        struct ceph_pg_pool_info *pi;
 939        u32 num, len;
 940        u64 pool;
 941
 942        ceph_decode_32_safe(p, end, num, bad);
 943        dout(" %d pool names\n", num);
 944        while (num--) {
 945                ceph_decode_64_safe(p, end, pool, bad);
 946                ceph_decode_32_safe(p, end, len, bad);
 947                dout("  pool %llu len %d\n", pool, len);
 948                ceph_decode_need(p, end, len, bad);
 949                pi = lookup_pg_pool(&map->pg_pools, pool);
 950                if (pi) {
 951                        char *name = kstrndup(*p, len, GFP_NOFS);
 952
 953                        if (!name)
 954                                return -ENOMEM;
 955                        kfree(pi->name);
 956                        pi->name = name;
 957                        dout("  name is %s\n", pi->name);
 958                }
 959                *p += len;
 960        }
 961        return 0;
 962
 963bad:
 964        return -EINVAL;
 965}
 966
 967/*
 968 * CRUSH workspaces
 969 *
 970 * workspace_manager framework borrowed from fs/btrfs/compression.c.
 971 * Two simplifications: there is only one type of workspace and there
 972 * is always at least one workspace.
 973 */
 974static struct crush_work *alloc_workspace(const struct crush_map *c)
 975{
 976        struct crush_work *work;
 977        size_t work_size;
 978
 979        WARN_ON(!c->working_size);
 980        work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
 981        dout("%s work_size %zu bytes\n", __func__, work_size);
 982
 983        work = ceph_kvmalloc(work_size, GFP_NOIO);
 984        if (!work)
 985                return NULL;
 986
 987        INIT_LIST_HEAD(&work->item);
 988        crush_init_workspace(c, work);
 989        return work;
 990}
 991
 992static void free_workspace(struct crush_work *work)
 993{
 994        WARN_ON(!list_empty(&work->item));
 995        kvfree(work);
 996}
 997
 998static void init_workspace_manager(struct workspace_manager *wsm)
 999{
1000        INIT_LIST_HEAD(&wsm->idle_ws);
1001        spin_lock_init(&wsm->ws_lock);
1002        atomic_set(&wsm->total_ws, 0);
1003        wsm->free_ws = 0;
1004        init_waitqueue_head(&wsm->ws_wait);
1005}
1006
1007static void add_initial_workspace(struct workspace_manager *wsm,
1008                                  struct crush_work *work)
1009{
1010        WARN_ON(!list_empty(&wsm->idle_ws));
1011
1012        list_add(&work->item, &wsm->idle_ws);
1013        atomic_set(&wsm->total_ws, 1);
1014        wsm->free_ws = 1;
1015}
1016
1017static void cleanup_workspace_manager(struct workspace_manager *wsm)
1018{
1019        struct crush_work *work;
1020
1021        while (!list_empty(&wsm->idle_ws)) {
1022                work = list_first_entry(&wsm->idle_ws, struct crush_work,
1023                                        item);
1024                list_del_init(&work->item);
1025                free_workspace(work);
1026        }
1027        atomic_set(&wsm->total_ws, 0);
1028        wsm->free_ws = 0;
1029}
1030
1031/*
1032 * Finds an available workspace or allocates a new one.  If it's not
1033 * possible to allocate a new one, waits until there is one.
1034 */
1035static struct crush_work *get_workspace(struct workspace_manager *wsm,
1036                                        const struct crush_map *c)
1037{
1038        struct crush_work *work;
1039        int cpus = num_online_cpus();
1040
1041again:
1042        spin_lock(&wsm->ws_lock);
1043        if (!list_empty(&wsm->idle_ws)) {
1044                work = list_first_entry(&wsm->idle_ws, struct crush_work,
1045                                        item);
1046                list_del_init(&work->item);
1047                wsm->free_ws--;
1048                spin_unlock(&wsm->ws_lock);
1049                return work;
1050
1051        }
1052        if (atomic_read(&wsm->total_ws) > cpus) {
1053                DEFINE_WAIT(wait);
1054
1055                spin_unlock(&wsm->ws_lock);
1056                prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1057                if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1058                        schedule();
1059                finish_wait(&wsm->ws_wait, &wait);
1060                goto again;
1061        }
1062        atomic_inc(&wsm->total_ws);
1063        spin_unlock(&wsm->ws_lock);
1064
1065        work = alloc_workspace(c);
1066        if (!work) {
1067                atomic_dec(&wsm->total_ws);
1068                wake_up(&wsm->ws_wait);
1069
1070                /*
1071                 * Do not return the error but go back to waiting.  We
1072                 * have the initial workspace and the CRUSH computation
1073                 * time is bounded so we will get it eventually.
1074                 */
1075                WARN_ON(atomic_read(&wsm->total_ws) < 1);
1076                goto again;
1077        }
1078        return work;
1079}
1080
1081/*
1082 * Puts a workspace back on the list or frees it if we have enough
1083 * idle ones sitting around.
1084 */
1085static void put_workspace(struct workspace_manager *wsm,
1086                          struct crush_work *work)
1087{
1088        spin_lock(&wsm->ws_lock);
1089        if (wsm->free_ws <= num_online_cpus()) {
1090                list_add(&work->item, &wsm->idle_ws);
1091                wsm->free_ws++;
1092                spin_unlock(&wsm->ws_lock);
1093                goto wake;
1094        }
1095        spin_unlock(&wsm->ws_lock);
1096
1097        free_workspace(work);
1098        atomic_dec(&wsm->total_ws);
1099wake:
1100        if (wq_has_sleeper(&wsm->ws_wait))
1101                wake_up(&wsm->ws_wait);
1102}
1103
1104/*
1105 * osd map
1106 */
1107struct ceph_osdmap *ceph_osdmap_alloc(void)
1108{
1109        struct ceph_osdmap *map;
1110
1111        map = kzalloc(sizeof(*map), GFP_NOIO);
1112        if (!map)
1113                return NULL;
1114
1115        map->pg_pools = RB_ROOT;
1116        map->pool_max = -1;
1117        map->pg_temp = RB_ROOT;
1118        map->primary_temp = RB_ROOT;
1119        map->pg_upmap = RB_ROOT;
1120        map->pg_upmap_items = RB_ROOT;
1121
1122        init_workspace_manager(&map->crush_wsm);
1123
1124        return map;
1125}
1126
1127void ceph_osdmap_destroy(struct ceph_osdmap *map)
1128{
1129        dout("osdmap_destroy %p\n", map);
1130
1131        if (map->crush)
1132                crush_destroy(map->crush);
1133        cleanup_workspace_manager(&map->crush_wsm);
1134
1135        while (!RB_EMPTY_ROOT(&map->pg_temp)) {
1136                struct ceph_pg_mapping *pg =
1137                        rb_entry(rb_first(&map->pg_temp),
1138                                 struct ceph_pg_mapping, node);
1139                erase_pg_mapping(&map->pg_temp, pg);
1140                free_pg_mapping(pg);
1141        }
1142        while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1143                struct ceph_pg_mapping *pg =
1144                        rb_entry(rb_first(&map->primary_temp),
1145                                 struct ceph_pg_mapping, node);
1146                erase_pg_mapping(&map->primary_temp, pg);
1147                free_pg_mapping(pg);
1148        }
1149        while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1150                struct ceph_pg_mapping *pg =
1151                        rb_entry(rb_first(&map->pg_upmap),
1152                                 struct ceph_pg_mapping, node);
1153                rb_erase(&pg->node, &map->pg_upmap);
1154                kfree(pg);
1155        }
1156        while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1157                struct ceph_pg_mapping *pg =
1158                        rb_entry(rb_first(&map->pg_upmap_items),
1159                                 struct ceph_pg_mapping, node);
1160                rb_erase(&pg->node, &map->pg_upmap_items);
1161                kfree(pg);
1162        }
1163        while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1164                struct ceph_pg_pool_info *pi =
1165                        rb_entry(rb_first(&map->pg_pools),
1166                                 struct ceph_pg_pool_info, node);
1167                __remove_pg_pool(&map->pg_pools, pi);
1168        }
1169        kvfree(map->osd_state);
1170        kvfree(map->osd_weight);
1171        kvfree(map->osd_addr);
1172        kvfree(map->osd_primary_affinity);
1173        kfree(map);
1174}
1175
1176/*
1177 * Adjust max_osd value, (re)allocate arrays.
1178 *
1179 * The new elements are properly initialized.
1180 */
1181static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1182{
1183        u32 *state;
1184        u32 *weight;
1185        struct ceph_entity_addr *addr;
1186        u32 to_copy;
1187        int i;
1188
1189        dout("%s old %u new %u\n", __func__, map->max_osd, max);
1190        if (max == map->max_osd)
1191                return 0;
1192
1193        state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1194        weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1195        addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1196        if (!state || !weight || !addr) {
1197                kvfree(state);
1198                kvfree(weight);
1199                kvfree(addr);
1200                return -ENOMEM;
1201        }
1202
1203        to_copy = min(map->max_osd, max);
1204        if (map->osd_state) {
1205                memcpy(state, map->osd_state, to_copy * sizeof(*state));
1206                memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1207                memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1208                kvfree(map->osd_state);
1209                kvfree(map->osd_weight);
1210                kvfree(map->osd_addr);
1211        }
1212
1213        map->osd_state = state;
1214        map->osd_weight = weight;
1215        map->osd_addr = addr;
1216        for (i = map->max_osd; i < max; i++) {
1217                map->osd_state[i] = 0;
1218                map->osd_weight[i] = CEPH_OSD_OUT;
1219                memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1220        }
1221
1222        if (map->osd_primary_affinity) {
1223                u32 *affinity;
1224
1225                affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)),
1226                                         GFP_NOFS);
1227                if (!affinity)
1228                        return -ENOMEM;
1229
1230                memcpy(affinity, map->osd_primary_affinity,
1231                       to_copy * sizeof(*affinity));
1232                kvfree(map->osd_primary_affinity);
1233
1234                map->osd_primary_affinity = affinity;
1235                for (i = map->max_osd; i < max; i++)
1236                        map->osd_primary_affinity[i] =
1237                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1238        }
1239
1240        map->max_osd = max;
1241
1242        return 0;
1243}
1244
1245static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1246{
1247        struct crush_work *work;
1248
1249        if (IS_ERR(crush))
1250                return PTR_ERR(crush);
1251
1252        work = alloc_workspace(crush);
1253        if (!work) {
1254                crush_destroy(crush);
1255                return -ENOMEM;
1256        }
1257
1258        if (map->crush)
1259                crush_destroy(map->crush);
1260        cleanup_workspace_manager(&map->crush_wsm);
1261        map->crush = crush;
1262        add_initial_workspace(&map->crush_wsm, work);
1263        return 0;
1264}
1265
1266#define OSDMAP_WRAPPER_COMPAT_VER       7
1267#define OSDMAP_CLIENT_DATA_COMPAT_VER   1
1268
1269/*
1270 * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
1271 * to struct_v of the client_data section for new (v7 and above)
1272 * osdmaps.
1273 */
1274static int get_osdmap_client_data_v(void **p, void *end,
1275                                    const char *prefix, u8 *v)
1276{
1277        u8 struct_v;
1278
1279        ceph_decode_8_safe(p, end, struct_v, e_inval);
1280        if (struct_v >= 7) {
1281                u8 struct_compat;
1282
1283                ceph_decode_8_safe(p, end, struct_compat, e_inval);
1284                if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1285                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1286                                struct_v, struct_compat,
1287                                OSDMAP_WRAPPER_COMPAT_VER, prefix);
1288                        return -EINVAL;
1289                }
1290                *p += 4; /* ignore wrapper struct_len */
1291
1292                ceph_decode_8_safe(p, end, struct_v, e_inval);
1293                ceph_decode_8_safe(p, end, struct_compat, e_inval);
1294                if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1295                        pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1296                                struct_v, struct_compat,
1297                                OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1298                        return -EINVAL;
1299                }
1300                *p += 4; /* ignore client data struct_len */
1301        } else {
1302                u16 version;
1303
1304                *p -= 1;
1305                ceph_decode_16_safe(p, end, version, e_inval);
1306                if (version < 6) {
1307                        pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1308                                version, prefix);
1309                        return -EINVAL;
1310                }
1311
1312                /* old osdmap enconding */
1313                struct_v = 0;
1314        }
1315
1316        *v = struct_v;
1317        return 0;
1318
1319e_inval:
1320        return -EINVAL;
1321}
1322
1323static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1324                          bool incremental)
1325{
1326        u32 n;
1327
1328        ceph_decode_32_safe(p, end, n, e_inval);
1329        while (n--) {
1330                struct ceph_pg_pool_info *pi;
1331                u64 pool;
1332                int ret;
1333
1334                ceph_decode_64_safe(p, end, pool, e_inval);
1335
1336                pi = lookup_pg_pool(&map->pg_pools, pool);
1337                if (!incremental || !pi) {
1338                        pi = kzalloc(sizeof(*pi), GFP_NOFS);
1339                        if (!pi)
1340                                return -ENOMEM;
1341
1342                        RB_CLEAR_NODE(&pi->node);
1343                        pi->id = pool;
1344
1345                        if (!__insert_pg_pool(&map->pg_pools, pi)) {
1346                                kfree(pi);
1347                                return -EEXIST;
1348                        }
1349                }
1350
1351                ret = decode_pool(p, end, pi);
1352                if (ret)
1353                        return ret;
1354        }
1355
1356        return 0;
1357
1358e_inval:
1359        return -EINVAL;
1360}
1361
1362static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1363{
1364        return __decode_pools(p, end, map, false);
1365}
1366
1367static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1368{
1369        return __decode_pools(p, end, map, true);
1370}
1371
1372typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1373
1374static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1375                             decode_mapping_fn_t fn, bool incremental)
1376{
1377        u32 n;
1378
1379        WARN_ON(!incremental && !fn);
1380
1381        ceph_decode_32_safe(p, end, n, e_inval);
1382        while (n--) {
1383                struct ceph_pg_mapping *pg;
1384                struct ceph_pg pgid;
1385                int ret;
1386
1387                ret = ceph_decode_pgid(p, end, &pgid);
1388                if (ret)
1389                        return ret;
1390
1391                pg = lookup_pg_mapping(mapping_root, &pgid);
1392                if (pg) {
1393                        WARN_ON(!incremental);
1394                        erase_pg_mapping(mapping_root, pg);
1395                        free_pg_mapping(pg);
1396                }
1397
1398                if (fn) {
1399                        pg = fn(p, end, incremental);
1400                        if (IS_ERR(pg))
1401                                return PTR_ERR(pg);
1402
1403                        if (pg) {
1404                                pg->pgid = pgid; /* struct */
1405                                insert_pg_mapping(mapping_root, pg);
1406                        }
1407                }
1408        }
1409
1410        return 0;
1411
1412e_inval:
1413        return -EINVAL;
1414}
1415
1416static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1417                                                bool incremental)
1418{
1419        struct ceph_pg_mapping *pg;
1420        u32 len, i;
1421
1422        ceph_decode_32_safe(p, end, len, e_inval);
1423        if (len == 0 && incremental)
1424                return NULL;    /* new_pg_temp: [] to remove */
1425        if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1426                return ERR_PTR(-EINVAL);
1427
1428        ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1429        pg = alloc_pg_mapping(len * sizeof(u32));
1430        if (!pg)
1431                return ERR_PTR(-ENOMEM);
1432
1433        pg->pg_temp.len = len;
1434        for (i = 0; i < len; i++)
1435                pg->pg_temp.osds[i] = ceph_decode_32(p);
1436
1437        return pg;
1438
1439e_inval:
1440        return ERR_PTR(-EINVAL);
1441}
1442
1443static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1444{
1445        return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1446                                 false);
1447}
1448
1449static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1450{
1451        return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1452                                 true);
1453}
1454
1455static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1456                                                     bool incremental)
1457{
1458        struct ceph_pg_mapping *pg;
1459        u32 osd;
1460
1461        ceph_decode_32_safe(p, end, osd, e_inval);
1462        if (osd == (u32)-1 && incremental)
1463                return NULL;    /* new_primary_temp: -1 to remove */
1464
1465        pg = alloc_pg_mapping(0);
1466        if (!pg)
1467                return ERR_PTR(-ENOMEM);
1468
1469        pg->primary_temp.osd = osd;
1470        return pg;
1471
1472e_inval:
1473        return ERR_PTR(-EINVAL);
1474}
1475
1476static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1477{
1478        return decode_pg_mapping(p, end, &map->primary_temp,
1479                                 __decode_primary_temp, false);
1480}
1481
1482static int decode_new_primary_temp(void **p, void *end,
1483                                   struct ceph_osdmap *map)
1484{
1485        return decode_pg_mapping(p, end, &map->primary_temp,
1486                                 __decode_primary_temp, true);
1487}
1488
1489u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1490{
1491        BUG_ON(osd >= map->max_osd);
1492
1493        if (!map->osd_primary_affinity)
1494                return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1495
1496        return map->osd_primary_affinity[osd];
1497}
1498
1499static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1500{
1501        BUG_ON(osd >= map->max_osd);
1502
1503        if (!map->osd_primary_affinity) {
1504                int i;
1505
1506                map->osd_primary_affinity = ceph_kvmalloc(
1507                    array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1508                    GFP_NOFS);
1509                if (!map->osd_primary_affinity)
1510                        return -ENOMEM;
1511
1512                for (i = 0; i < map->max_osd; i++)
1513                        map->osd_primary_affinity[i] =
1514                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1515        }
1516
1517        map->osd_primary_affinity[osd] = aff;
1518
1519        return 0;
1520}
1521
1522static int decode_primary_affinity(void **p, void *end,
1523                                   struct ceph_osdmap *map)
1524{
1525        u32 len, i;
1526
1527        ceph_decode_32_safe(p, end, len, e_inval);
1528        if (len == 0) {
1529                kvfree(map->osd_primary_affinity);
1530                map->osd_primary_affinity = NULL;
1531                return 0;
1532        }
1533        if (len != map->max_osd)
1534                goto e_inval;
1535
1536        ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1537
1538        for (i = 0; i < map->max_osd; i++) {
1539                int ret;
1540
1541                ret = set_primary_affinity(map, i, ceph_decode_32(p));
1542                if (ret)
1543                        return ret;
1544        }
1545
1546        return 0;
1547
1548e_inval:
1549        return -EINVAL;
1550}
1551
1552static int decode_new_primary_affinity(void **p, void *end,
1553                                       struct ceph_osdmap *map)
1554{
1555        u32 n;
1556
1557        ceph_decode_32_safe(p, end, n, e_inval);
1558        while (n--) {
1559                u32 osd, aff;
1560                int ret;
1561
1562                ceph_decode_32_safe(p, end, osd, e_inval);
1563                ceph_decode_32_safe(p, end, aff, e_inval);
1564
1565                ret = set_primary_affinity(map, osd, aff);
1566                if (ret)
1567                        return ret;
1568
1569                pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1570        }
1571
1572        return 0;
1573
1574e_inval:
1575        return -EINVAL;
1576}
1577
1578static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1579                                                 bool __unused)
1580{
1581        return __decode_pg_temp(p, end, false);
1582}
1583
1584static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1585{
1586        return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1587                                 false);
1588}
1589
1590static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1591{
1592        return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1593                                 true);
1594}
1595
1596static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1597{
1598        return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1599}
1600
1601static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1602                                                       bool __unused)
1603{
1604        struct ceph_pg_mapping *pg;
1605        u32 len, i;
1606
1607        ceph_decode_32_safe(p, end, len, e_inval);
1608        if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1609                return ERR_PTR(-EINVAL);
1610
1611        ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1612        pg = alloc_pg_mapping(2 * len * sizeof(u32));
1613        if (!pg)
1614                return ERR_PTR(-ENOMEM);
1615
1616        pg->pg_upmap_items.len = len;
1617        for (i = 0; i < len; i++) {
1618                pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1619                pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1620        }
1621
1622        return pg;
1623
1624e_inval:
1625        return ERR_PTR(-EINVAL);
1626}
1627
1628static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1629{
1630        return decode_pg_mapping(p, end, &map->pg_upmap_items,
1631                                 __decode_pg_upmap_items, false);
1632}
1633
1634static int decode_new_pg_upmap_items(void **p, void *end,
1635                                     struct ceph_osdmap *map)
1636{
1637        return decode_pg_mapping(p, end, &map->pg_upmap_items,
1638                                 __decode_pg_upmap_items, true);
1639}
1640
1641static int decode_old_pg_upmap_items(void **p, void *end,
1642                                     struct ceph_osdmap *map)
1643{
1644        return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1645}
1646
1647/*
1648 * decode a full map.
1649 */
1650static int osdmap_decode(void **p, void *end, bool msgr2,
1651                         struct ceph_osdmap *map)
1652{
1653        u8 struct_v;
1654        u32 epoch = 0;
1655        void *start = *p;
1656        u32 max;
1657        u32 len, i;
1658        int err;
1659
1660        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1661
1662        err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1663        if (err)
1664                goto bad;
1665
1666        /* fsid, epoch, created, modified */
1667        ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1668                         sizeof(map->created) + sizeof(map->modified), e_inval);
1669        ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1670        epoch = map->epoch = ceph_decode_32(p);
1671        ceph_decode_copy(p, &map->created, sizeof(map->created));
1672        ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1673
1674        /* pools */
1675        err = decode_pools(p, end, map);
1676        if (err)
1677                goto bad;
1678
1679        /* pool_name */
1680        err = decode_pool_names(p, end, map);
1681        if (err)
1682                goto bad;
1683
1684        ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1685
1686        ceph_decode_32_safe(p, end, map->flags, e_inval);
1687
1688        /* max_osd */
1689        ceph_decode_32_safe(p, end, max, e_inval);
1690
1691        /* (re)alloc osd arrays */
1692        err = osdmap_set_max_osd(map, max);
1693        if (err)
1694                goto bad;
1695
1696        /* osd_state, osd_weight, osd_addrs->client_addr */
1697        ceph_decode_need(p, end, 3*sizeof(u32) +
1698                         map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1699                                                       sizeof(u8)) +
1700                                       sizeof(*map->osd_weight), e_inval);
1701        if (ceph_decode_32(p) != map->max_osd)
1702                goto e_inval;
1703
1704        if (struct_v >= 5) {
1705                for (i = 0; i < map->max_osd; i++)
1706                        map->osd_state[i] = ceph_decode_32(p);
1707        } else {
1708                for (i = 0; i < map->max_osd; i++)
1709                        map->osd_state[i] = ceph_decode_8(p);
1710        }
1711
1712        if (ceph_decode_32(p) != map->max_osd)
1713                goto e_inval;
1714
1715        for (i = 0; i < map->max_osd; i++)
1716                map->osd_weight[i] = ceph_decode_32(p);
1717
1718        if (ceph_decode_32(p) != map->max_osd)
1719                goto e_inval;
1720
1721        for (i = 0; i < map->max_osd; i++) {
1722                struct ceph_entity_addr *addr = &map->osd_addr[i];
1723
1724                if (struct_v >= 8)
1725                        err = ceph_decode_entity_addrvec(p, end, msgr2, addr);
1726                else
1727                        err = ceph_decode_entity_addr(p, end, addr);
1728                if (err)
1729                        goto bad;
1730
1731                dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr));
1732        }
1733
1734        /* pg_temp */
1735        err = decode_pg_temp(p, end, map);
1736        if (err)
1737                goto bad;
1738
1739        /* primary_temp */
1740        if (struct_v >= 1) {
1741                err = decode_primary_temp(p, end, map);
1742                if (err)
1743                        goto bad;
1744        }
1745
1746        /* primary_affinity */
1747        if (struct_v >= 2) {
1748                err = decode_primary_affinity(p, end, map);
1749                if (err)
1750                        goto bad;
1751        } else {
1752                WARN_ON(map->osd_primary_affinity);
1753        }
1754
1755        /* crush */
1756        ceph_decode_32_safe(p, end, len, e_inval);
1757        err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1758        if (err)
1759                goto bad;
1760
1761        *p += len;
1762        if (struct_v >= 3) {
1763                /* erasure_code_profiles */
1764                ceph_decode_skip_map_of_map(p, end, string, string, string,
1765                                            e_inval);
1766        }
1767
1768        if (struct_v >= 4) {
1769                err = decode_pg_upmap(p, end, map);
1770                if (err)
1771                        goto bad;
1772
1773                err = decode_pg_upmap_items(p, end, map);
1774                if (err)
1775                        goto bad;
1776        } else {
1777                WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1778                WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1779        }
1780
1781        /* ignore the rest */
1782        *p = end;
1783
1784        dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1785        return 0;
1786
1787e_inval:
1788        err = -EINVAL;
1789bad:
1790        pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1791               err, epoch, (int)(*p - start), *p, start, end);
1792        print_hex_dump(KERN_DEBUG, "osdmap: ",
1793                       DUMP_PREFIX_OFFSET, 16, 1,
1794                       start, end - start, true);
1795        return err;
1796}
1797
1798/*
1799 * Allocate and decode a full map.
1800 */
1801struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2)
1802{
1803        struct ceph_osdmap *map;
1804        int ret;
1805
1806        map = ceph_osdmap_alloc();
1807        if (!map)
1808                return ERR_PTR(-ENOMEM);
1809
1810        ret = osdmap_decode(p, end, msgr2, map);
1811        if (ret) {
1812                ceph_osdmap_destroy(map);
1813                return ERR_PTR(ret);
1814        }
1815
1816        return map;
1817}
1818
1819/*
1820 * Encoding order is (new_up_client, new_state, new_weight).  Need to
1821 * apply in the (new_weight, new_state, new_up_client) order, because
1822 * an incremental map may look like e.g.
1823 *
1824 *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1825 *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1826 */
1827static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1828                                      bool msgr2, struct ceph_osdmap *map)
1829{
1830        void *new_up_client;
1831        void *new_state;
1832        void *new_weight_end;
1833        u32 len;
1834        int ret;
1835        int i;
1836
1837        new_up_client = *p;
1838        ceph_decode_32_safe(p, end, len, e_inval);
1839        for (i = 0; i < len; ++i) {
1840                struct ceph_entity_addr addr;
1841
1842                ceph_decode_skip_32(p, end, e_inval);
1843                if (struct_v >= 7)
1844                        ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1845                else
1846                        ret = ceph_decode_entity_addr(p, end, &addr);
1847                if (ret)
1848                        return ret;
1849        }
1850
1851        new_state = *p;
1852        ceph_decode_32_safe(p, end, len, e_inval);
1853        len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1854        ceph_decode_need(p, end, len, e_inval);
1855        *p += len;
1856
1857        /* new_weight */
1858        ceph_decode_32_safe(p, end, len, e_inval);
1859        while (len--) {
1860                s32 osd;
1861                u32 w;
1862
1863                ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1864                osd = ceph_decode_32(p);
1865                w = ceph_decode_32(p);
1866                BUG_ON(osd >= map->max_osd);
1867                pr_info("osd%d weight 0x%x %s\n", osd, w,
1868                     w == CEPH_OSD_IN ? "(in)" :
1869                     (w == CEPH_OSD_OUT ? "(out)" : ""));
1870                map->osd_weight[osd] = w;
1871
1872                /*
1873                 * If we are marking in, set the EXISTS, and clear the
1874                 * AUTOOUT and NEW bits.
1875                 */
1876                if (w) {
1877                        map->osd_state[osd] |= CEPH_OSD_EXISTS;
1878                        map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1879                                                 CEPH_OSD_NEW);
1880                }
1881        }
1882        new_weight_end = *p;
1883
1884        /* new_state (up/down) */
1885        *p = new_state;
1886        len = ceph_decode_32(p);
1887        while (len--) {
1888                s32 osd;
1889                u32 xorstate;
1890
1891                osd = ceph_decode_32(p);
1892                if (struct_v >= 5)
1893                        xorstate = ceph_decode_32(p);
1894                else
1895                        xorstate = ceph_decode_8(p);
1896                if (xorstate == 0)
1897                        xorstate = CEPH_OSD_UP;
1898                BUG_ON(osd >= map->max_osd);
1899                if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1900                    (xorstate & CEPH_OSD_UP))
1901                        pr_info("osd%d down\n", osd);
1902                if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1903                    (xorstate & CEPH_OSD_EXISTS)) {
1904                        pr_info("osd%d does not exist\n", osd);
1905                        ret = set_primary_affinity(map, osd,
1906                                                   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1907                        if (ret)
1908                                return ret;
1909                        memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1910                        map->osd_state[osd] = 0;
1911                } else {
1912                        map->osd_state[osd] ^= xorstate;
1913                }
1914        }
1915
1916        /* new_up_client */
1917        *p = new_up_client;
1918        len = ceph_decode_32(p);
1919        while (len--) {
1920                s32 osd;
1921                struct ceph_entity_addr addr;
1922
1923                osd = ceph_decode_32(p);
1924                BUG_ON(osd >= map->max_osd);
1925                if (struct_v >= 7)
1926                        ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1927                else
1928                        ret = ceph_decode_entity_addr(p, end, &addr);
1929                if (ret)
1930                        return ret;
1931
1932                dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr));
1933
1934                pr_info("osd%d up\n", osd);
1935                map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1936                map->osd_addr[osd] = addr;
1937        }
1938
1939        *p = new_weight_end;
1940        return 0;
1941
1942e_inval:
1943        return -EINVAL;
1944}
1945
1946/*
1947 * decode and apply an incremental map update.
1948 */
1949struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2,
1950                                             struct ceph_osdmap *map)
1951{
1952        struct ceph_fsid fsid;
1953        u32 epoch = 0;
1954        struct ceph_timespec modified;
1955        s32 len;
1956        u64 pool;
1957        __s64 new_pool_max;
1958        __s32 new_flags, max;
1959        void *start = *p;
1960        int err;
1961        u8 struct_v;
1962
1963        dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1964
1965        err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1966        if (err)
1967                goto bad;
1968
1969        /* fsid, epoch, modified, new_pool_max, new_flags */
1970        ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1971                         sizeof(u64) + sizeof(u32), e_inval);
1972        ceph_decode_copy(p, &fsid, sizeof(fsid));
1973        epoch = ceph_decode_32(p);
1974        BUG_ON(epoch != map->epoch+1);
1975        ceph_decode_copy(p, &modified, sizeof(modified));
1976        new_pool_max = ceph_decode_64(p);
1977        new_flags = ceph_decode_32(p);
1978
1979        /* full map? */
1980        ceph_decode_32_safe(p, end, len, e_inval);
1981        if (len > 0) {
1982                dout("apply_incremental full map len %d, %p to %p\n",
1983                     len, *p, end);
1984                return ceph_osdmap_decode(p, min(*p+len, end), msgr2);
1985        }
1986
1987        /* new crush? */
1988        ceph_decode_32_safe(p, end, len, e_inval);
1989        if (len > 0) {
1990                err = osdmap_set_crush(map,
1991                                       crush_decode(*p, min(*p + len, end)));
1992                if (err)
1993                        goto bad;
1994                *p += len;
1995        }
1996
1997        /* new flags? */
1998        if (new_flags >= 0)
1999                map->flags = new_flags;
2000        if (new_pool_max >= 0)
2001                map->pool_max = new_pool_max;
2002
2003        /* new max? */
2004        ceph_decode_32_safe(p, end, max, e_inval);
2005        if (max >= 0) {
2006                err = osdmap_set_max_osd(map, max);
2007                if (err)
2008                        goto bad;
2009        }
2010
2011        map->epoch++;
2012        map->modified = modified;
2013
2014        /* new_pools */
2015        err = decode_new_pools(p, end, map);
2016        if (err)
2017                goto bad;
2018
2019        /* new_pool_names */
2020        err = decode_pool_names(p, end, map);
2021        if (err)
2022                goto bad;
2023
2024        /* old_pool */
2025        ceph_decode_32_safe(p, end, len, e_inval);
2026        while (len--) {
2027                struct ceph_pg_pool_info *pi;
2028
2029                ceph_decode_64_safe(p, end, pool, e_inval);
2030                pi = lookup_pg_pool(&map->pg_pools, pool);
2031                if (pi)
2032                        __remove_pg_pool(&map->pg_pools, pi);
2033        }
2034
2035        /* new_up_client, new_state, new_weight */
2036        err = decode_new_up_state_weight(p, end, struct_v, msgr2, map);
2037        if (err)
2038                goto bad;
2039
2040        /* new_pg_temp */
2041        err = decode_new_pg_temp(p, end, map);
2042        if (err)
2043                goto bad;
2044
2045        /* new_primary_temp */
2046        if (struct_v >= 1) {
2047                err = decode_new_primary_temp(p, end, map);
2048                if (err)
2049                        goto bad;
2050        }
2051
2052        /* new_primary_affinity */
2053        if (struct_v >= 2) {
2054                err = decode_new_primary_affinity(p, end, map);
2055                if (err)
2056                        goto bad;
2057        }
2058
2059        if (struct_v >= 3) {
2060                /* new_erasure_code_profiles */
2061                ceph_decode_skip_map_of_map(p, end, string, string, string,
2062                                            e_inval);
2063                /* old_erasure_code_profiles */
2064                ceph_decode_skip_set(p, end, string, e_inval);
2065        }
2066
2067        if (struct_v >= 4) {
2068                err = decode_new_pg_upmap(p, end, map);
2069                if (err)
2070                        goto bad;
2071
2072                err = decode_old_pg_upmap(p, end, map);
2073                if (err)
2074                        goto bad;
2075
2076                err = decode_new_pg_upmap_items(p, end, map);
2077                if (err)
2078                        goto bad;
2079
2080                err = decode_old_pg_upmap_items(p, end, map);
2081                if (err)
2082                        goto bad;
2083        }
2084
2085        /* ignore the rest */
2086        *p = end;
2087
2088        dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
2089        return map;
2090
2091e_inval:
2092        err = -EINVAL;
2093bad:
2094        pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
2095               err, epoch, (int)(*p - start), *p, start, end);
2096        print_hex_dump(KERN_DEBUG, "osdmap: ",
2097                       DUMP_PREFIX_OFFSET, 16, 1,
2098                       start, end - start, true);
2099        return ERR_PTR(err);
2100}
2101
2102void ceph_oloc_copy(struct ceph_object_locator *dest,
2103                    const struct ceph_object_locator *src)
2104{
2105        ceph_oloc_destroy(dest);
2106
2107        dest->pool = src->pool;
2108        if (src->pool_ns)
2109                dest->pool_ns = ceph_get_string(src->pool_ns);
2110        else
2111                dest->pool_ns = NULL;
2112}
2113EXPORT_SYMBOL(ceph_oloc_copy);
2114
2115void ceph_oloc_destroy(struct ceph_object_locator *oloc)
2116{
2117        ceph_put_string(oloc->pool_ns);
2118}
2119EXPORT_SYMBOL(ceph_oloc_destroy);
2120
2121void ceph_oid_copy(struct ceph_object_id *dest,
2122                   const struct ceph_object_id *src)
2123{
2124        ceph_oid_destroy(dest);
2125
2126        if (src->name != src->inline_name) {
2127                /* very rare, see ceph_object_id definition */
2128                dest->name = kmalloc(src->name_len + 1,
2129                                     GFP_NOIO | __GFP_NOFAIL);
2130        } else {
2131                dest->name = dest->inline_name;
2132        }
2133        memcpy(dest->name, src->name, src->name_len + 1);
2134        dest->name_len = src->name_len;
2135}
2136EXPORT_SYMBOL(ceph_oid_copy);
2137
2138static __printf(2, 0)
2139int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
2140{
2141        int len;
2142
2143        WARN_ON(!ceph_oid_empty(oid));
2144
2145        len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
2146        if (len >= sizeof(oid->inline_name))
2147                return len;
2148
2149        oid->name_len = len;
2150        return 0;
2151}
2152
2153/*
2154 * If oid doesn't fit into inline buffer, BUG.
2155 */
2156void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2157{
2158        va_list ap;
2159
2160        va_start(ap, fmt);
2161        BUG_ON(oid_printf_vargs(oid, fmt, ap));
2162        va_end(ap);
2163}
2164EXPORT_SYMBOL(ceph_oid_printf);
2165
2166static __printf(3, 0)
2167int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2168                      const char *fmt, va_list ap)
2169{
2170        va_list aq;
2171        int len;
2172
2173        va_copy(aq, ap);
2174        len = oid_printf_vargs(oid, fmt, aq);
2175        va_end(aq);
2176
2177        if (len) {
2178                char *external_name;
2179
2180                external_name = kmalloc(len + 1, gfp);
2181                if (!external_name)
2182                        return -ENOMEM;
2183
2184                oid->name = external_name;
2185                WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2186                oid->name_len = len;
2187        }
2188
2189        return 0;
2190}
2191
2192/*
2193 * If oid doesn't fit into inline buffer, allocate.
2194 */
2195int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2196                     const char *fmt, ...)
2197{
2198        va_list ap;
2199        int ret;
2200
2201        va_start(ap, fmt);
2202        ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2203        va_end(ap);
2204
2205        return ret;
2206}
2207EXPORT_SYMBOL(ceph_oid_aprintf);
2208
2209void ceph_oid_destroy(struct ceph_object_id *oid)
2210{
2211        if (oid->name != oid->inline_name)
2212                kfree(oid->name);
2213}
2214EXPORT_SYMBOL(ceph_oid_destroy);
2215
2216/*
2217 * osds only
2218 */
2219static bool __osds_equal(const struct ceph_osds *lhs,
2220                         const struct ceph_osds *rhs)
2221{
2222        if (lhs->size == rhs->size &&
2223            !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2224                return true;
2225
2226        return false;
2227}
2228
2229/*
2230 * osds + primary
2231 */
2232static bool osds_equal(const struct ceph_osds *lhs,
2233                       const struct ceph_osds *rhs)
2234{
2235        if (__osds_equal(lhs, rhs) &&
2236            lhs->primary == rhs->primary)
2237                return true;
2238
2239        return false;
2240}
2241
2242static bool osds_valid(const struct ceph_osds *set)
2243{
2244        /* non-empty set */
2245        if (set->size > 0 && set->primary >= 0)
2246                return true;
2247
2248        /* empty can_shift_osds set */
2249        if (!set->size && set->primary == -1)
2250                return true;
2251
2252        /* empty !can_shift_osds set - all NONE */
2253        if (set->size > 0 && set->primary == -1) {
2254                int i;
2255
2256                for (i = 0; i < set->size; i++) {
2257                        if (set->osds[i] != CRUSH_ITEM_NONE)
2258                                break;
2259                }
2260                if (i == set->size)
2261                        return true;
2262        }
2263
2264        return false;
2265}
2266
2267void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2268{
2269        memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2270        dest->size = src->size;
2271        dest->primary = src->primary;
2272}
2273
2274bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2275                      u32 new_pg_num)
2276{
2277        int old_bits = calc_bits_of(old_pg_num);
2278        int old_mask = (1 << old_bits) - 1;
2279        int n;
2280
2281        WARN_ON(pgid->seed >= old_pg_num);
2282        if (new_pg_num <= old_pg_num)
2283                return false;
2284
2285        for (n = 1; ; n++) {
2286                int next_bit = n << (old_bits - 1);
2287                u32 s = next_bit | pgid->seed;
2288
2289                if (s < old_pg_num || s == pgid->seed)
2290                        continue;
2291                if (s >= new_pg_num)
2292                        break;
2293
2294                s = ceph_stable_mod(s, old_pg_num, old_mask);
2295                if (s == pgid->seed)
2296                        return true;
2297        }
2298
2299        return false;
2300}
2301
2302bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2303                          const struct ceph_osds *new_acting,
2304                          const struct ceph_osds *old_up,
2305                          const struct ceph_osds *new_up,
2306                          int old_size,
2307                          int new_size,
2308                          int old_min_size,
2309                          int new_min_size,
2310                          u32 old_pg_num,
2311                          u32 new_pg_num,
2312                          bool old_sort_bitwise,
2313                          bool new_sort_bitwise,
2314                          bool old_recovery_deletes,
2315                          bool new_recovery_deletes,
2316                          const struct ceph_pg *pgid)
2317{
2318        return !osds_equal(old_acting, new_acting) ||
2319               !osds_equal(old_up, new_up) ||
2320               old_size != new_size ||
2321               old_min_size != new_min_size ||
2322               ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2323               old_sort_bitwise != new_sort_bitwise ||
2324               old_recovery_deletes != new_recovery_deletes;
2325}
2326
2327static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2328{
2329        int i;
2330
2331        for (i = 0; i < acting->size; i++) {
2332                if (acting->osds[i] == osd)
2333                        return i;
2334        }
2335
2336        return -1;
2337}
2338
2339static bool primary_changed(const struct ceph_osds *old_acting,
2340                            const struct ceph_osds *new_acting)
2341{
2342        if (!old_acting->size && !new_acting->size)
2343                return false; /* both still empty */
2344
2345        if (!old_acting->size ^ !new_acting->size)
2346                return true; /* was empty, now not, or vice versa */
2347
2348        if (old_acting->primary != new_acting->primary)
2349                return true; /* primary changed */
2350
2351        if (calc_pg_rank(old_acting->primary, old_acting) !=
2352            calc_pg_rank(new_acting->primary, new_acting))
2353                return true;
2354
2355        return false; /* same primary (tho replicas may have changed) */
2356}
2357
2358bool ceph_osds_changed(const struct ceph_osds *old_acting,
2359                       const struct ceph_osds *new_acting,
2360                       bool any_change)
2361{
2362        if (primary_changed(old_acting, new_acting))
2363                return true;
2364
2365        if (any_change && !__osds_equal(old_acting, new_acting))
2366                return true;
2367
2368        return false;
2369}
2370
2371/*
2372 * Map an object into a PG.
2373 *
2374 * Should only be called with target_oid and target_oloc (as opposed to
2375 * base_oid and base_oloc), since tiering isn't taken into account.
2376 */
2377void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2378                                 const struct ceph_object_id *oid,
2379                                 const struct ceph_object_locator *oloc,
2380                                 struct ceph_pg *raw_pgid)
2381{
2382        WARN_ON(pi->id != oloc->pool);
2383
2384        if (!oloc->pool_ns) {
2385                raw_pgid->pool = oloc->pool;
2386                raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2387                                             oid->name_len);
2388                dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2389                     raw_pgid->pool, raw_pgid->seed);
2390        } else {
2391                char stack_buf[256];
2392                char *buf = stack_buf;
2393                int nsl = oloc->pool_ns->len;
2394                size_t total = nsl + 1 + oid->name_len;
2395
2396                if (total > sizeof(stack_buf))
2397                        buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2398                memcpy(buf, oloc->pool_ns->str, nsl);
2399                buf[nsl] = '\037';
2400                memcpy(buf + nsl + 1, oid->name, oid->name_len);
2401                raw_pgid->pool = oloc->pool;
2402                raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2403                if (buf != stack_buf)
2404                        kfree(buf);
2405                dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2406                     oid->name, nsl, oloc->pool_ns->str,
2407                     raw_pgid->pool, raw_pgid->seed);
2408        }
2409}
2410
2411int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2412                              const struct ceph_object_id *oid,
2413                              const struct ceph_object_locator *oloc,
2414                              struct ceph_pg *raw_pgid)
2415{
2416        struct ceph_pg_pool_info *pi;
2417
2418        pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2419        if (!pi)
2420                return -ENOENT;
2421
2422        __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2423        return 0;
2424}
2425EXPORT_SYMBOL(ceph_object_locator_to_pg);
2426
2427/*
2428 * Map a raw PG (full precision ps) into an actual PG.
2429 */
2430static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2431                         const struct ceph_pg *raw_pgid,
2432                         struct ceph_pg *pgid)
2433{
2434        pgid->pool = raw_pgid->pool;
2435        pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2436                                     pi->pg_num_mask);
2437}
2438
2439/*
2440 * Map a raw PG (full precision ps) into a placement ps (placement
2441 * seed).  Include pool id in that value so that different pools don't
2442 * use the same seeds.
2443 */
2444static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2445                         const struct ceph_pg *raw_pgid)
2446{
2447        if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2448                /* hash pool id and seed so that pool PGs do not overlap */
2449                return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2450                                      ceph_stable_mod(raw_pgid->seed,
2451                                                      pi->pgp_num,
2452                                                      pi->pgp_num_mask),
2453                                      raw_pgid->pool);
2454        } else {
2455                /*
2456                 * legacy behavior: add ps and pool together.  this is
2457                 * not a great approach because the PGs from each pool
2458                 * will overlap on top of each other: 0.5 == 1.4 ==
2459                 * 2.3 == ...
2460                 */
2461                return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2462                                       pi->pgp_num_mask) +
2463                       (unsigned)raw_pgid->pool;
2464        }
2465}
2466
2467/*
2468 * Magic value used for a "default" fallback choose_args, used if the
2469 * crush_choose_arg_map passed to do_crush() does not exist.  If this
2470 * also doesn't exist, fall back to canonical weights.
2471 */
2472#define CEPH_DEFAULT_CHOOSE_ARGS        -1
2473
2474static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2475                    int *result, int result_max,
2476                    const __u32 *weight, int weight_max,
2477                    s64 choose_args_index)
2478{
2479        struct crush_choose_arg_map *arg_map;
2480        struct crush_work *work;
2481        int r;
2482
2483        BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2484
2485        arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2486                                        choose_args_index);
2487        if (!arg_map)
2488                arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2489                                                CEPH_DEFAULT_CHOOSE_ARGS);
2490
2491        work = get_workspace(&map->crush_wsm, map->crush);
2492        r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2493                          weight, weight_max, work,
2494                          arg_map ? arg_map->args : NULL);
2495        put_workspace(&map->crush_wsm, work);
2496        return r;
2497}
2498
2499static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2500                                    struct ceph_pg_pool_info *pi,
2501                                    struct ceph_osds *set)
2502{
2503        int i;
2504
2505        if (ceph_can_shift_osds(pi)) {
2506                int removed = 0;
2507
2508                /* shift left */
2509                for (i = 0; i < set->size; i++) {
2510                        if (!ceph_osd_exists(osdmap, set->osds[i])) {
2511                                removed++;
2512                                continue;
2513                        }
2514                        if (removed)
2515                                set->osds[i - removed] = set->osds[i];
2516                }
2517                set->size -= removed;
2518        } else {
2519                /* set dne devices to NONE */
2520                for (i = 0; i < set->size; i++) {
2521                        if (!ceph_osd_exists(osdmap, set->osds[i]))
2522                                set->osds[i] = CRUSH_ITEM_NONE;
2523                }
2524        }
2525}
2526
2527/*
2528 * Calculate raw set (CRUSH output) for given PG and filter out
2529 * nonexistent OSDs.  ->primary is undefined for a raw set.
2530 *
2531 * Placement seed (CRUSH input) is returned through @ppps.
2532 */
2533static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2534                           struct ceph_pg_pool_info *pi,
2535                           const struct ceph_pg *raw_pgid,
2536                           struct ceph_osds *raw,
2537                           u32 *ppps)
2538{
2539        u32 pps = raw_pg_to_pps(pi, raw_pgid);
2540        int ruleno;
2541        int len;
2542
2543        ceph_osds_init(raw);
2544        if (ppps)
2545                *ppps = pps;
2546
2547        ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2548                                 pi->size);
2549        if (ruleno < 0) {
2550                pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2551                       pi->id, pi->crush_ruleset, pi->type, pi->size);
2552                return;
2553        }
2554
2555        if (pi->size > ARRAY_SIZE(raw->osds)) {
2556                pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2557                       pi->id, pi->crush_ruleset, pi->type, pi->size,
2558                       ARRAY_SIZE(raw->osds));
2559                return;
2560        }
2561
2562        len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2563                       osdmap->osd_weight, osdmap->max_osd, pi->id);
2564        if (len < 0) {
2565                pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2566                       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2567                       pi->size);
2568                return;
2569        }
2570
2571        raw->size = len;
2572        remove_nonexistent_osds(osdmap, pi, raw);
2573}
2574
2575/* apply pg_upmap[_items] mappings */
2576static void apply_upmap(struct ceph_osdmap *osdmap,
2577                        const struct ceph_pg *pgid,
2578                        struct ceph_osds *raw)
2579{
2580        struct ceph_pg_mapping *pg;
2581        int i, j;
2582
2583        pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2584        if (pg) {
2585                /* make sure targets aren't marked out */
2586                for (i = 0; i < pg->pg_upmap.len; i++) {
2587                        int osd = pg->pg_upmap.osds[i];
2588
2589                        if (osd != CRUSH_ITEM_NONE &&
2590                            osd < osdmap->max_osd &&
2591                            osdmap->osd_weight[osd] == 0) {
2592                                /* reject/ignore explicit mapping */
2593                                return;
2594                        }
2595                }
2596                for (i = 0; i < pg->pg_upmap.len; i++)
2597                        raw->osds[i] = pg->pg_upmap.osds[i];
2598                raw->size = pg->pg_upmap.len;
2599                /* check and apply pg_upmap_items, if any */
2600        }
2601
2602        pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2603        if (pg) {
2604                /*
2605                 * Note: this approach does not allow a bidirectional swap,
2606                 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2607                 */
2608                for (i = 0; i < pg->pg_upmap_items.len; i++) {
2609                        int from = pg->pg_upmap_items.from_to[i][0];
2610                        int to = pg->pg_upmap_items.from_to[i][1];
2611                        int pos = -1;
2612                        bool exists = false;
2613
2614                        /* make sure replacement doesn't already appear */
2615                        for (j = 0; j < raw->size; j++) {
2616                                int osd = raw->osds[j];
2617
2618                                if (osd == to) {
2619                                        exists = true;
2620                                        break;
2621                                }
2622                                /* ignore mapping if target is marked out */
2623                                if (osd == from && pos < 0 &&
2624                                    !(to != CRUSH_ITEM_NONE &&
2625                                      to < osdmap->max_osd &&
2626                                      osdmap->osd_weight[to] == 0)) {
2627                                        pos = j;
2628                                }
2629                        }
2630                        if (!exists && pos >= 0)
2631                                raw->osds[pos] = to;
2632                }
2633        }
2634}
2635
2636/*
2637 * Given raw set, calculate up set and up primary.  By definition of an
2638 * up set, the result won't contain nonexistent or down OSDs.
2639 *
2640 * This is done in-place - on return @set is the up set.  If it's
2641 * empty, ->primary will remain undefined.
2642 */
2643static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2644                           struct ceph_pg_pool_info *pi,
2645                           struct ceph_osds *set)
2646{
2647        int i;
2648
2649        /* ->primary is undefined for a raw set */
2650        BUG_ON(set->primary != -1);
2651
2652        if (ceph_can_shift_osds(pi)) {
2653                int removed = 0;
2654
2655                /* shift left */
2656                for (i = 0; i < set->size; i++) {
2657                        if (ceph_osd_is_down(osdmap, set->osds[i])) {
2658                                removed++;
2659                                continue;
2660                        }
2661                        if (removed)
2662                                set->osds[i - removed] = set->osds[i];
2663                }
2664                set->size -= removed;
2665                if (set->size > 0)
2666                        set->primary = set->osds[0];
2667        } else {
2668                /* set down/dne devices to NONE */
2669                for (i = set->size - 1; i >= 0; i--) {
2670                        if (ceph_osd_is_down(osdmap, set->osds[i]))
2671                                set->osds[i] = CRUSH_ITEM_NONE;
2672                        else
2673                                set->primary = set->osds[i];
2674                }
2675        }
2676}
2677
2678static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2679                                   struct ceph_pg_pool_info *pi,
2680                                   u32 pps,
2681                                   struct ceph_osds *up)
2682{
2683        int i;
2684        int pos = -1;
2685
2686        /*
2687         * Do we have any non-default primary_affinity values for these
2688         * osds?
2689         */
2690        if (!osdmap->osd_primary_affinity)
2691                return;
2692
2693        for (i = 0; i < up->size; i++) {
2694                int osd = up->osds[i];
2695
2696                if (osd != CRUSH_ITEM_NONE &&
2697                    osdmap->osd_primary_affinity[osd] !=
2698                                        CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2699                        break;
2700                }
2701        }
2702        if (i == up->size)
2703                return;
2704
2705        /*
2706         * Pick the primary.  Feed both the seed (for the pg) and the
2707         * osd into the hash/rng so that a proportional fraction of an
2708         * osd's pgs get rejected as primary.
2709         */
2710        for (i = 0; i < up->size; i++) {
2711                int osd = up->osds[i];
2712                u32 aff;
2713
2714                if (osd == CRUSH_ITEM_NONE)
2715                        continue;
2716
2717                aff = osdmap->osd_primary_affinity[osd];
2718                if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2719                    (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2720                                    pps, osd) >> 16) >= aff) {
2721                        /*
2722                         * We chose not to use this primary.  Note it
2723                         * anyway as a fallback in case we don't pick
2724                         * anyone else, but keep looking.
2725                         */
2726                        if (pos < 0)
2727                                pos = i;
2728                } else {
2729                        pos = i;
2730                        break;
2731                }
2732        }
2733        if (pos < 0)
2734                return;
2735
2736        up->primary = up->osds[pos];
2737
2738        if (ceph_can_shift_osds(pi) && pos > 0) {
2739                /* move the new primary to the front */
2740                for (i = pos; i > 0; i--)
2741                        up->osds[i] = up->osds[i - 1];
2742                up->osds[0] = up->primary;
2743        }
2744}
2745
2746/*
2747 * Get pg_temp and primary_temp mappings for given PG.
2748 *
2749 * Note that a PG may have none, only pg_temp, only primary_temp or
2750 * both pg_temp and primary_temp mappings.  This means @temp isn't
2751 * always a valid OSD set on return: in the "only primary_temp" case,
2752 * @temp will have its ->primary >= 0 but ->size == 0.
2753 */
2754static void get_temp_osds(struct ceph_osdmap *osdmap,
2755                          struct ceph_pg_pool_info *pi,
2756                          const struct ceph_pg *pgid,
2757                          struct ceph_osds *temp)
2758{
2759        struct ceph_pg_mapping *pg;
2760        int i;
2761
2762        ceph_osds_init(temp);
2763
2764        /* pg_temp? */
2765        pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2766        if (pg) {
2767                for (i = 0; i < pg->pg_temp.len; i++) {
2768                        if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2769                                if (ceph_can_shift_osds(pi))
2770                                        continue;
2771
2772                                temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2773                        } else {
2774                                temp->osds[temp->size++] = pg->pg_temp.osds[i];
2775                        }
2776                }
2777
2778                /* apply pg_temp's primary */
2779                for (i = 0; i < temp->size; i++) {
2780                        if (temp->osds[i] != CRUSH_ITEM_NONE) {
2781                                temp->primary = temp->osds[i];
2782                                break;
2783                        }
2784                }
2785        }
2786
2787        /* primary_temp? */
2788        pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2789        if (pg)
2790                temp->primary = pg->primary_temp.osd;
2791}
2792
2793/*
2794 * Map a PG to its acting set as well as its up set.
2795 *
2796 * Acting set is used for data mapping purposes, while up set can be
2797 * recorded for detecting interval changes and deciding whether to
2798 * resend a request.
2799 */
2800void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2801                               struct ceph_pg_pool_info *pi,
2802                               const struct ceph_pg *raw_pgid,
2803                               struct ceph_osds *up,
2804                               struct ceph_osds *acting)
2805{
2806        struct ceph_pg pgid;
2807        u32 pps;
2808
2809        WARN_ON(pi->id != raw_pgid->pool);
2810        raw_pg_to_pg(pi, raw_pgid, &pgid);
2811
2812        pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2813        apply_upmap(osdmap, &pgid, up);
2814        raw_to_up_osds(osdmap, pi, up);
2815        apply_primary_affinity(osdmap, pi, pps, up);
2816        get_temp_osds(osdmap, pi, &pgid, acting);
2817        if (!acting->size) {
2818                memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2819                acting->size = up->size;
2820                if (acting->primary == -1)
2821                        acting->primary = up->primary;
2822        }
2823        WARN_ON(!osds_valid(up) || !osds_valid(acting));
2824}
2825
2826bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2827                              struct ceph_pg_pool_info *pi,
2828                              const struct ceph_pg *raw_pgid,
2829                              struct ceph_spg *spgid)
2830{
2831        struct ceph_pg pgid;
2832        struct ceph_osds up, acting;
2833        int i;
2834
2835        WARN_ON(pi->id != raw_pgid->pool);
2836        raw_pg_to_pg(pi, raw_pgid, &pgid);
2837
2838        if (ceph_can_shift_osds(pi)) {
2839                spgid->pgid = pgid; /* struct */
2840                spgid->shard = CEPH_SPG_NOSHARD;
2841                return true;
2842        }
2843
2844        ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2845        for (i = 0; i < acting.size; i++) {
2846                if (acting.osds[i] == acting.primary) {
2847                        spgid->pgid = pgid; /* struct */
2848                        spgid->shard = i;
2849                        return true;
2850                }
2851        }
2852
2853        return false;
2854}
2855
2856/*
2857 * Return acting primary for given PG, or -1 if none.
2858 */
2859int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2860                              const struct ceph_pg *raw_pgid)
2861{
2862        struct ceph_pg_pool_info *pi;
2863        struct ceph_osds up, acting;
2864
2865        pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2866        if (!pi)
2867                return -1;
2868
2869        ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2870        return acting.primary;
2871}
2872EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2873
2874static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2875                                              size_t name_len)
2876{
2877        struct crush_loc_node *loc;
2878
2879        loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2880        if (!loc)
2881                return NULL;
2882
2883        RB_CLEAR_NODE(&loc->cl_node);
2884        return loc;
2885}
2886
2887static void free_crush_loc(struct crush_loc_node *loc)
2888{
2889        WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2890
2891        kfree(loc);
2892}
2893
2894static int crush_loc_compare(const struct crush_loc *loc1,
2895                             const struct crush_loc *loc2)
2896{
2897        return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2898               strcmp(loc1->cl_name, loc2->cl_name);
2899}
2900
2901DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2902                 RB_BYPTR, const struct crush_loc *, cl_node)
2903
2904/*
2905 * Parses a set of <bucket type name>':'<bucket name> pairs separated
2906 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2907 *
2908 * Note that @crush_location is modified by strsep().
2909 */
2910int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2911{
2912        struct crush_loc_node *loc;
2913        const char *type_name, *name, *colon;
2914        size_t type_name_len, name_len;
2915
2916        dout("%s '%s'\n", __func__, crush_location);
2917        while ((type_name = strsep(&crush_location, "|"))) {
2918                colon = strchr(type_name, ':');
2919                if (!colon)
2920                        return -EINVAL;
2921
2922                type_name_len = colon - type_name;
2923                if (type_name_len == 0)
2924                        return -EINVAL;
2925
2926                name = colon + 1;
2927                name_len = strlen(name);
2928                if (name_len == 0)
2929                        return -EINVAL;
2930
2931                loc = alloc_crush_loc(type_name_len, name_len);
2932                if (!loc)
2933                        return -ENOMEM;
2934
2935                loc->cl_loc.cl_type_name = loc->cl_data;
2936                memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2937                loc->cl_loc.cl_type_name[type_name_len] = '\0';
2938
2939                loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2940                memcpy(loc->cl_loc.cl_name, name, name_len);
2941                loc->cl_loc.cl_name[name_len] = '\0';
2942
2943                if (!__insert_crush_loc(locs, loc)) {
2944                        free_crush_loc(loc);
2945                        return -EEXIST;
2946                }
2947
2948                dout("%s type_name '%s' name '%s'\n", __func__,
2949                     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2950        }
2951
2952        return 0;
2953}
2954
2955int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2956{
2957        struct rb_node *n1 = rb_first(locs1);
2958        struct rb_node *n2 = rb_first(locs2);
2959        int ret;
2960
2961        for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2962                struct crush_loc_node *loc1 =
2963                    rb_entry(n1, struct crush_loc_node, cl_node);
2964                struct crush_loc_node *loc2 =
2965                    rb_entry(n2, struct crush_loc_node, cl_node);
2966
2967                ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2968                if (ret)
2969                        return ret;
2970        }
2971
2972        if (!n1 && n2)
2973                return -1;
2974        if (n1 && !n2)
2975                return 1;
2976        return 0;
2977}
2978
2979void ceph_clear_crush_locs(struct rb_root *locs)
2980{
2981        while (!RB_EMPTY_ROOT(locs)) {
2982                struct crush_loc_node *loc =
2983                    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2984
2985                erase_crush_loc(locs, loc);
2986                free_crush_loc(loc);
2987        }
2988}
2989
2990/*
2991 * [a-zA-Z0-9-_.]+
2992 */
2993static bool is_valid_crush_name(const char *name)
2994{
2995        do {
2996                if (!('a' <= *name && *name <= 'z') &&
2997                    !('A' <= *name && *name <= 'Z') &&
2998                    !('0' <= *name && *name <= '9') &&
2999                    *name != '-' && *name != '_' && *name != '.')
3000                        return false;
3001        } while (*++name != '\0');
3002
3003        return true;
3004}
3005
3006/*
3007 * Gets the parent of an item.  Returns its id (<0 because the
3008 * parent is always a bucket), type id (>0 for the same reason,
3009 * via @parent_type_id) and location (via @parent_loc).  If no
3010 * parent, returns 0.
3011 *
3012 * Does a linear search, as there are no parent pointers of any
3013 * kind.  Note that the result is ambigous for items that occur
3014 * multiple times in the map.
3015 */
3016static int get_immediate_parent(struct crush_map *c, int id,
3017                                u16 *parent_type_id,
3018                                struct crush_loc *parent_loc)
3019{
3020        struct crush_bucket *b;
3021        struct crush_name_node *type_cn, *cn;
3022        int i, j;
3023
3024        for (i = 0; i < c->max_buckets; i++) {
3025                b = c->buckets[i];
3026                if (!b)
3027                        continue;
3028
3029                /* ignore per-class shadow hierarchy */
3030                cn = lookup_crush_name(&c->names, b->id);
3031                if (!cn || !is_valid_crush_name(cn->cn_name))
3032                        continue;
3033
3034                for (j = 0; j < b->size; j++) {
3035                        if (b->items[j] != id)
3036                                continue;
3037
3038                        *parent_type_id = b->type;
3039                        type_cn = lookup_crush_name(&c->type_names, b->type);
3040                        parent_loc->cl_type_name = type_cn->cn_name;
3041                        parent_loc->cl_name = cn->cn_name;
3042                        return b->id;
3043                }
3044        }
3045
3046        return 0;  /* no parent */
3047}
3048
3049/*
3050 * Calculates the locality/distance from an item to a client
3051 * location expressed in terms of CRUSH hierarchy as a set of
3052 * (bucket type name, bucket name) pairs.  Specifically, looks
3053 * for the lowest-valued bucket type for which the location of
3054 * @id matches one of the locations in @locs, so for standard
3055 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3056 * a matching host is closer than a matching rack and a matching
3057 * data center is closer than a matching zone.
3058 *
3059 * Specifying multiple locations (a "multipath" location) such
3060 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3061 * is a multimap.  The locality will be:
3062 *
3063 * - 3 for OSDs in racks foo1 and foo2
3064 * - 8 for OSDs in data center bar
3065 * - -1 for all other OSDs
3066 *
3067 * The lowest possible bucket type is 1, so the best locality
3068 * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
3069 * the OSD itself.
3070 */
3071int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3072                            struct rb_root *locs)
3073{
3074        struct crush_loc loc;
3075        u16 type_id;
3076
3077        /*
3078         * Instead of repeated get_immediate_parent() calls,
3079         * the location of @id could be obtained with a single
3080         * depth-first traversal.
3081         */
3082        for (;;) {
3083                id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3084                if (id >= 0)
3085                        return -1;  /* not local */
3086
3087                if (lookup_crush_loc(locs, &loc))
3088                        return type_id;
3089        }
3090}
3091