linux/fs/dlm/dir.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "rcom.h"
  17#include "config.h"
  18#include "memory.h"
  19#include "recover.h"
  20#include "util.h"
  21#include "lock.h"
  22#include "dir.h"
  23
  24/*
  25 * We use the upper 16 bits of the hash value to select the directory node.
  26 * Low bits are used for distribution of rsb's among hash buckets on each node.
  27 *
  28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
  29 * num_nodes to the hash value.  This value in the desired range is used as an
  30 * offset into the sorted list of nodeid's to give the particular nodeid.
  31 */
  32
  33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
  34{
  35        uint32_t node;
  36
  37        if (ls->ls_num_nodes == 1)
  38                return dlm_our_nodeid();
  39        else {
  40                node = (hash >> 16) % ls->ls_total_weight;
  41                return ls->ls_node_array[node];
  42        }
  43}
  44
  45int dlm_dir_nodeid(struct dlm_rsb *r)
  46{
  47        return r->res_dir_nodeid;
  48}
  49
  50void dlm_recover_dir_nodeid(struct dlm_ls *ls)
  51{
  52        struct dlm_rsb *r;
  53
  54        down_read(&ls->ls_root_sem);
  55        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
  56                r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
  57        }
  58        up_read(&ls->ls_root_sem);
  59}
  60
  61int dlm_recover_directory(struct dlm_ls *ls)
  62{
  63        struct dlm_member *memb;
  64        char *b, *last_name = NULL;
  65        int error = -ENOMEM, last_len, nodeid, result;
  66        uint16_t namelen;
  67        unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
  68
  69        log_rinfo(ls, "dlm_recover_directory");
  70
  71        if (dlm_no_directory(ls))
  72                goto out_status;
  73
  74        last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
  75        if (!last_name)
  76                goto out;
  77
  78        list_for_each_entry(memb, &ls->ls_nodes, list) {
  79                if (memb->nodeid == dlm_our_nodeid())
  80                        continue;
  81
  82                memset(last_name, 0, DLM_RESNAME_MAXLEN);
  83                last_len = 0;
  84
  85                for (;;) {
  86                        int left;
  87                        error = dlm_recovery_stopped(ls);
  88                        if (error)
  89                                goto out_free;
  90
  91                        error = dlm_rcom_names(ls, memb->nodeid,
  92                                               last_name, last_len);
  93                        if (error)
  94                                goto out_free;
  95
  96                        cond_resched();
  97
  98                        /*
  99                         * pick namelen/name pairs out of received buffer
 100                         */
 101
 102                        b = ls->ls_recover_buf->rc_buf;
 103                        left = ls->ls_recover_buf->rc_header.h_length;
 104                        left -= sizeof(struct dlm_rcom);
 105
 106                        for (;;) {
 107                                __be16 v;
 108
 109                                error = -EINVAL;
 110                                if (left < sizeof(__be16))
 111                                        goto out_free;
 112
 113                                memcpy(&v, b, sizeof(__be16));
 114                                namelen = be16_to_cpu(v);
 115                                b += sizeof(__be16);
 116                                left -= sizeof(__be16);
 117
 118                                /* namelen of 0xFFFFF marks end of names for
 119                                   this node; namelen of 0 marks end of the
 120                                   buffer */
 121
 122                                if (namelen == 0xFFFF)
 123                                        goto done;
 124                                if (!namelen)
 125                                        break;
 126
 127                                if (namelen > left)
 128                                        goto out_free;
 129
 130                                if (namelen > DLM_RESNAME_MAXLEN)
 131                                        goto out_free;
 132
 133                                error = dlm_master_lookup(ls, memb->nodeid,
 134                                                          b, namelen,
 135                                                          DLM_LU_RECOVER_DIR,
 136                                                          &nodeid, &result);
 137                                if (error) {
 138                                        log_error(ls, "recover_dir lookup %d",
 139                                                  error);
 140                                        goto out_free;
 141                                }
 142
 143                                /* The name was found in rsbtbl, but the
 144                                 * master nodeid is different from
 145                                 * memb->nodeid which says it is the master.
 146                                 * This should not happen. */
 147
 148                                if (result == DLM_LU_MATCH &&
 149                                    nodeid != memb->nodeid) {
 150                                        count_bad++;
 151                                        log_error(ls, "recover_dir lookup %d "
 152                                                  "nodeid %d memb %d bad %u",
 153                                                  result, nodeid, memb->nodeid,
 154                                                  count_bad);
 155                                        print_hex_dump_bytes("dlm_recover_dir ",
 156                                                             DUMP_PREFIX_NONE,
 157                                                             b, namelen);
 158                                }
 159
 160                                /* The name was found in rsbtbl, and the
 161                                 * master nodeid matches memb->nodeid. */
 162
 163                                if (result == DLM_LU_MATCH &&
 164                                    nodeid == memb->nodeid) {
 165                                        count_match++;
 166                                }
 167
 168                                /* The name was not found in rsbtbl and was
 169                                 * added with memb->nodeid as the master. */
 170
 171                                if (result == DLM_LU_ADD) {
 172                                        count_add++;
 173                                }
 174
 175                                last_len = namelen;
 176                                memcpy(last_name, b, namelen);
 177                                b += namelen;
 178                                left -= namelen;
 179                                count++;
 180                        }
 181                }
 182         done:
 183                ;
 184        }
 185
 186 out_status:
 187        error = 0;
 188        dlm_set_recover_status(ls, DLM_RS_DIR);
 189
 190        log_rinfo(ls, "dlm_recover_directory %u in %u new",
 191                  count, count_add);
 192 out_free:
 193        kfree(last_name);
 194 out:
 195        return error;
 196}
 197
 198static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 199{
 200        struct dlm_rsb *r;
 201        uint32_t hash, bucket;
 202        int rv;
 203
 204        hash = jhash(name, len, 0);
 205        bucket = hash & (ls->ls_rsbtbl_size - 1);
 206
 207        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 208        rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 209        if (rv)
 210                rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 211                                         name, len, &r);
 212        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 213
 214        if (!rv)
 215                return r;
 216
 217        down_read(&ls->ls_root_sem);
 218        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 219                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 220                        up_read(&ls->ls_root_sem);
 221                        log_debug(ls, "find_rsb_root revert to root_list %s",
 222                                  r->res_name);
 223                        return r;
 224                }
 225        }
 226        up_read(&ls->ls_root_sem);
 227        return NULL;
 228}
 229
 230/* Find the rsb where we left off (or start again), then send rsb names
 231   for rsb's we're master of and whose directory node matches the requesting
 232   node.  inbuf is the rsb name last sent, inlen is the name's length */
 233
 234void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 235                           char *outbuf, int outlen, int nodeid)
 236{
 237        struct list_head *list;
 238        struct dlm_rsb *r;
 239        int offset = 0, dir_nodeid;
 240        __be16 be_namelen;
 241
 242        down_read(&ls->ls_root_sem);
 243
 244        if (inlen > 1) {
 245                r = find_rsb_root(ls, inbuf, inlen);
 246                if (!r) {
 247                        inbuf[inlen - 1] = '\0';
 248                        log_error(ls, "copy_master_names from %d start %d %s",
 249                                  nodeid, inlen, inbuf);
 250                        goto out;
 251                }
 252                list = r->res_root_list.next;
 253        } else {
 254                list = ls->ls_root_list.next;
 255        }
 256
 257        for (offset = 0; list != &ls->ls_root_list; list = list->next) {
 258                r = list_entry(list, struct dlm_rsb, res_root_list);
 259                if (r->res_nodeid)
 260                        continue;
 261
 262                dir_nodeid = dlm_dir_nodeid(r);
 263                if (dir_nodeid != nodeid)
 264                        continue;
 265
 266                /*
 267                 * The block ends when we can't fit the following in the
 268                 * remaining buffer space:
 269                 * namelen (uint16_t) +
 270                 * name (r->res_length) +
 271                 * end-of-block record 0x0000 (uint16_t)
 272                 */
 273
 274                if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
 275                        /* Write end-of-block record */
 276                        be_namelen = cpu_to_be16(0);
 277                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 278                        offset += sizeof(__be16);
 279                        ls->ls_recover_dir_sent_msg++;
 280                        goto out;
 281                }
 282
 283                be_namelen = cpu_to_be16(r->res_length);
 284                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 285                offset += sizeof(__be16);
 286                memcpy(outbuf + offset, r->res_name, r->res_length);
 287                offset += r->res_length;
 288                ls->ls_recover_dir_sent_res++;
 289        }
 290
 291        /*
 292         * If we've reached the end of the list (and there's room) write a
 293         * terminating record.
 294         */
 295
 296        if ((list == &ls->ls_root_list) &&
 297            (offset + sizeof(uint16_t) <= outlen)) {
 298                be_namelen = cpu_to_be16(0xFFFF);
 299                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 300                offset += sizeof(__be16);
 301                ls->ls_recover_dir_sent_msg++;
 302        }
 303 out:
 304        up_read(&ls->ls_root_sem);
 305}
 306
 307