linux/fs/dlm/dir.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "lowcomms.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "memory.h"
  21#include "recover.h"
  22#include "util.h"
  23#include "lock.h"
  24#include "dir.h"
  25
  26
  27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
  28{
  29        spin_lock(&ls->ls_recover_list_lock);
  30        list_add(&de->list, &ls->ls_recover_list);
  31        spin_unlock(&ls->ls_recover_list_lock);
  32}
  33
  34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
  35{
  36        int found = 0;
  37        struct dlm_direntry *de;
  38
  39        spin_lock(&ls->ls_recover_list_lock);
  40        list_for_each_entry(de, &ls->ls_recover_list, list) {
  41                if (de->length == len) {
  42                        list_del(&de->list);
  43                        de->master_nodeid = 0;
  44                        memset(de->name, 0, len);
  45                        found = 1;
  46                        break;
  47                }
  48        }
  49        spin_unlock(&ls->ls_recover_list_lock);
  50
  51        if (!found)
  52                de = kzalloc(sizeof(struct dlm_direntry) + len,
  53                             ls->ls_allocation);
  54        return de;
  55}
  56
  57void dlm_clear_free_entries(struct dlm_ls *ls)
  58{
  59        struct dlm_direntry *de;
  60
  61        spin_lock(&ls->ls_recover_list_lock);
  62        while (!list_empty(&ls->ls_recover_list)) {
  63                de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
  64                                list);
  65                list_del(&de->list);
  66                kfree(de);
  67        }
  68        spin_unlock(&ls->ls_recover_list_lock);
  69}
  70
  71/*
  72 * We use the upper 16 bits of the hash value to select the directory node.
  73 * Low bits are used for distribution of rsb's among hash buckets on each node.
  74 *
  75 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
  76 * num_nodes to the hash value.  This value in the desired range is used as an
  77 * offset into the sorted list of nodeid's to give the particular nodeid.
  78 */
  79
  80int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
  81{
  82        struct list_head *tmp;
  83        struct dlm_member *memb = NULL;
  84        uint32_t node, n = 0;
  85        int nodeid;
  86
  87        if (ls->ls_num_nodes == 1) {
  88                nodeid = dlm_our_nodeid();
  89                goto out;
  90        }
  91
  92        if (ls->ls_node_array) {
  93                node = (hash >> 16) % ls->ls_total_weight;
  94                nodeid = ls->ls_node_array[node];
  95                goto out;
  96        }
  97
  98        /* make_member_array() failed to kmalloc ls_node_array... */
  99
 100        node = (hash >> 16) % ls->ls_num_nodes;
 101
 102        list_for_each(tmp, &ls->ls_nodes) {
 103                if (n++ != node)
 104                        continue;
 105                memb = list_entry(tmp, struct dlm_member, list);
 106                break;
 107        }
 108
 109        DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
 110                                 ls->ls_num_nodes, n, node););
 111        nodeid = memb->nodeid;
 112 out:
 113        return nodeid;
 114}
 115
 116int dlm_dir_nodeid(struct dlm_rsb *r)
 117{
 118        return dlm_hash2nodeid(r->res_ls, r->res_hash);
 119}
 120
 121static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
 122{
 123        uint32_t val;
 124
 125        val = jhash(name, len, 0);
 126        val &= (ls->ls_dirtbl_size - 1);
 127
 128        return val;
 129}
 130
 131static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
 132{
 133        uint32_t bucket;
 134
 135        bucket = dir_hash(ls, de->name, de->length);
 136        list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 137}
 138
 139static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
 140                                          int namelen, uint32_t bucket)
 141{
 142        struct dlm_direntry *de;
 143
 144        list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
 145                if (de->length == namelen && !memcmp(name, de->name, namelen))
 146                        goto out;
 147        }
 148        de = NULL;
 149 out:
 150        return de;
 151}
 152
 153void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
 154{
 155        struct dlm_direntry *de;
 156        uint32_t bucket;
 157
 158        bucket = dir_hash(ls, name, namelen);
 159
 160        spin_lock(&ls->ls_dirtbl[bucket].lock);
 161
 162        de = search_bucket(ls, name, namelen, bucket);
 163
 164        if (!de) {
 165                log_error(ls, "remove fr %u none", nodeid);
 166                goto out;
 167        }
 168
 169        if (de->master_nodeid != nodeid) {
 170                log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
 171                goto out;
 172        }
 173
 174        list_del(&de->list);
 175        kfree(de);
 176 out:
 177        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 178}
 179
 180void dlm_dir_clear(struct dlm_ls *ls)
 181{
 182        struct list_head *head;
 183        struct dlm_direntry *de;
 184        int i;
 185
 186        DLM_ASSERT(list_empty(&ls->ls_recover_list), );
 187
 188        for (i = 0; i < ls->ls_dirtbl_size; i++) {
 189                spin_lock(&ls->ls_dirtbl[i].lock);
 190                head = &ls->ls_dirtbl[i].list;
 191                while (!list_empty(head)) {
 192                        de = list_entry(head->next, struct dlm_direntry, list);
 193                        list_del(&de->list);
 194                        put_free_de(ls, de);
 195                }
 196                spin_unlock(&ls->ls_dirtbl[i].lock);
 197        }
 198}
 199
 200int dlm_recover_directory(struct dlm_ls *ls)
 201{
 202        struct dlm_member *memb;
 203        struct dlm_direntry *de;
 204        char *b, *last_name = NULL;
 205        int error = -ENOMEM, last_len, count = 0;
 206        uint16_t namelen;
 207
 208        log_debug(ls, "dlm_recover_directory");
 209
 210        if (dlm_no_directory(ls))
 211                goto out_status;
 212
 213        dlm_dir_clear(ls);
 214
 215        last_name = kmalloc(DLM_RESNAME_MAXLEN, ls->ls_allocation);
 216        if (!last_name)
 217                goto out;
 218
 219        list_for_each_entry(memb, &ls->ls_nodes, list) {
 220                memset(last_name, 0, DLM_RESNAME_MAXLEN);
 221                last_len = 0;
 222
 223                for (;;) {
 224                        int left;
 225                        error = dlm_recovery_stopped(ls);
 226                        if (error)
 227                                goto out_free;
 228
 229                        error = dlm_rcom_names(ls, memb->nodeid,
 230                                               last_name, last_len);
 231                        if (error)
 232                                goto out_free;
 233
 234                        schedule();
 235
 236                        /*
 237                         * pick namelen/name pairs out of received buffer
 238                         */
 239
 240                        b = ls->ls_recover_buf->rc_buf;
 241                        left = ls->ls_recover_buf->rc_header.h_length;
 242                        left -= sizeof(struct dlm_rcom);
 243
 244                        for (;;) {
 245                                __be16 v;
 246
 247                                error = -EINVAL;
 248                                if (left < sizeof(__be16))
 249                                        goto out_free;
 250
 251                                memcpy(&v, b, sizeof(__be16));
 252                                namelen = be16_to_cpu(v);
 253                                b += sizeof(__be16);
 254                                left -= sizeof(__be16);
 255
 256                                /* namelen of 0xFFFFF marks end of names for
 257                                   this node; namelen of 0 marks end of the
 258                                   buffer */
 259
 260                                if (namelen == 0xFFFF)
 261                                        goto done;
 262                                if (!namelen)
 263                                        break;
 264
 265                                if (namelen > left)
 266                                        goto out_free;
 267
 268                                if (namelen > DLM_RESNAME_MAXLEN)
 269                                        goto out_free;
 270
 271                                error = -ENOMEM;
 272                                de = get_free_de(ls, namelen);
 273                                if (!de)
 274                                        goto out_free;
 275
 276                                de->master_nodeid = memb->nodeid;
 277                                de->length = namelen;
 278                                last_len = namelen;
 279                                memcpy(de->name, b, namelen);
 280                                memcpy(last_name, b, namelen);
 281                                b += namelen;
 282                                left -= namelen;
 283
 284                                add_entry_to_hash(ls, de);
 285                                count++;
 286                        }
 287                }
 288         done:
 289                ;
 290        }
 291
 292 out_status:
 293        error = 0;
 294        dlm_set_recover_status(ls, DLM_RS_DIR);
 295        log_debug(ls, "dlm_recover_directory %d entries", count);
 296 out_free:
 297        kfree(last_name);
 298 out:
 299        dlm_clear_free_entries(ls);
 300        return error;
 301}
 302
 303static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
 304                     int namelen, int *r_nodeid)
 305{
 306        struct dlm_direntry *de, *tmp;
 307        uint32_t bucket;
 308
 309        bucket = dir_hash(ls, name, namelen);
 310
 311        spin_lock(&ls->ls_dirtbl[bucket].lock);
 312        de = search_bucket(ls, name, namelen, bucket);
 313        if (de) {
 314                *r_nodeid = de->master_nodeid;
 315                spin_unlock(&ls->ls_dirtbl[bucket].lock);
 316                if (*r_nodeid == nodeid)
 317                        return -EEXIST;
 318                return 0;
 319        }
 320
 321        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 322
 323        if (namelen > DLM_RESNAME_MAXLEN)
 324                return -EINVAL;
 325
 326        de = kzalloc(sizeof(struct dlm_direntry) + namelen, ls->ls_allocation);
 327        if (!de)
 328                return -ENOMEM;
 329
 330        de->master_nodeid = nodeid;
 331        de->length = namelen;
 332        memcpy(de->name, name, namelen);
 333
 334        spin_lock(&ls->ls_dirtbl[bucket].lock);
 335        tmp = search_bucket(ls, name, namelen, bucket);
 336        if (tmp) {
 337                kfree(de);
 338                de = tmp;
 339        } else {
 340                list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
 341        }
 342        *r_nodeid = de->master_nodeid;
 343        spin_unlock(&ls->ls_dirtbl[bucket].lock);
 344        return 0;
 345}
 346
 347int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
 348                   int *r_nodeid)
 349{
 350        return get_entry(ls, nodeid, name, namelen, r_nodeid);
 351}
 352
 353static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 354{
 355        struct dlm_rsb *r;
 356
 357        down_read(&ls->ls_root_sem);
 358        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 359                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 360                        up_read(&ls->ls_root_sem);
 361                        return r;
 362                }
 363        }
 364        up_read(&ls->ls_root_sem);
 365        return NULL;
 366}
 367
 368/* Find the rsb where we left off (or start again), then send rsb names
 369   for rsb's we're master of and whose directory node matches the requesting
 370   node.  inbuf is the rsb name last sent, inlen is the name's length */
 371
 372void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 373                           char *outbuf, int outlen, int nodeid)
 374{
 375        struct list_head *list;
 376        struct dlm_rsb *r;
 377        int offset = 0, dir_nodeid;
 378        __be16 be_namelen;
 379
 380        down_read(&ls->ls_root_sem);
 381
 382        if (inlen > 1) {
 383                r = find_rsb_root(ls, inbuf, inlen);
 384                if (!r) {
 385                        inbuf[inlen - 1] = '\0';
 386                        log_error(ls, "copy_master_names from %d start %d %s",
 387                                  nodeid, inlen, inbuf);
 388                        goto out;
 389                }
 390                list = r->res_root_list.next;
 391        } else {
 392                list = ls->ls_root_list.next;
 393        }
 394
 395        for (offset = 0; list != &ls->ls_root_list; list = list->next) {
 396                r = list_entry(list, struct dlm_rsb, res_root_list);
 397                if (r->res_nodeid)
 398                        continue;
 399
 400                dir_nodeid = dlm_dir_nodeid(r);
 401                if (dir_nodeid != nodeid)
 402                        continue;
 403
 404                /*
 405                 * The block ends when we can't fit the following in the
 406                 * remaining buffer space:
 407                 * namelen (uint16_t) +
 408                 * name (r->res_length) +
 409                 * end-of-block record 0x0000 (uint16_t)
 410                 */
 411
 412                if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
 413                        /* Write end-of-block record */
 414                        be_namelen = cpu_to_be16(0);
 415                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 416                        offset += sizeof(__be16);
 417                        goto out;
 418                }
 419
 420                be_namelen = cpu_to_be16(r->res_length);
 421                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 422                offset += sizeof(__be16);
 423                memcpy(outbuf + offset, r->res_name, r->res_length);
 424                offset += r->res_length;
 425        }
 426
 427        /*
 428         * If we've reached the end of the list (and there's room) write a
 429         * terminating record.
 430         */
 431
 432        if ((list == &ls->ls_root_list) &&
 433            (offset + sizeof(uint16_t) <= outlen)) {
 434                be_namelen = cpu_to_be16(0xFFFF);
 435                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 436                offset += sizeof(__be16);
 437        }
 438
 439 out:
 440        up_read(&ls->ls_root_sem);
 441}
 442
 443