linux/fs/dlm/dir.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "rcom.h"
  17#include "config.h"
  18#include "memory.h"
  19#include "recover.h"
  20#include "util.h"
  21#include "lock.h"
  22#include "dir.h"
  23
  24/*
  25 * We use the upper 16 bits of the hash value to select the directory node.
  26 * Low bits are used for distribution of rsb's among hash buckets on each node.
  27 *
  28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
  29 * num_nodes to the hash value.  This value in the desired range is used as an
  30 * offset into the sorted list of nodeid's to give the particular nodeid.
  31 */
  32
  33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
  34{
  35        uint32_t node;
  36
  37        if (ls->ls_num_nodes == 1)
  38                return dlm_our_nodeid();
  39        else {
  40                node = (hash >> 16) % ls->ls_total_weight;
  41                return ls->ls_node_array[node];
  42        }
  43}
  44
  45int dlm_dir_nodeid(struct dlm_rsb *r)
  46{
  47        return r->res_dir_nodeid;
  48}
  49
  50void dlm_recover_dir_nodeid(struct dlm_ls *ls)
  51{
  52        struct dlm_rsb *r;
  53
  54        down_read(&ls->ls_root_sem);
  55        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
  56                r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
  57        }
  58        up_read(&ls->ls_root_sem);
  59}
  60
  61int dlm_recover_directory(struct dlm_ls *ls)
  62{
  63        struct dlm_member *memb;
  64        char *b, *last_name = NULL;
  65        int error = -ENOMEM, last_len, nodeid, result;
  66        uint16_t namelen;
  67        unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
  68
  69        log_rinfo(ls, "dlm_recover_directory");
  70
  71        if (dlm_no_directory(ls))
  72                goto out_status;
  73
  74        last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
  75        if (!last_name)
  76                goto out;
  77
  78        list_for_each_entry(memb, &ls->ls_nodes, list) {
  79                if (memb->nodeid == dlm_our_nodeid())
  80                        continue;
  81
  82                memset(last_name, 0, DLM_RESNAME_MAXLEN);
  83                last_len = 0;
  84
  85                for (;;) {
  86                        int left;
  87                        error = dlm_recovery_stopped(ls);
  88                        if (error) {
  89                                error = -EINTR;
  90                                goto out_free;
  91                        }
  92
  93                        error = dlm_rcom_names(ls, memb->nodeid,
  94                                               last_name, last_len);
  95                        if (error)
  96                                goto out_free;
  97
  98                        cond_resched();
  99
 100                        /*
 101                         * pick namelen/name pairs out of received buffer
 102                         */
 103
 104                        b = ls->ls_recover_buf->rc_buf;
 105                        left = ls->ls_recover_buf->rc_header.h_length;
 106                        left -= sizeof(struct dlm_rcom);
 107
 108                        for (;;) {
 109                                __be16 v;
 110
 111                                error = -EINVAL;
 112                                if (left < sizeof(__be16))
 113                                        goto out_free;
 114
 115                                memcpy(&v, b, sizeof(__be16));
 116                                namelen = be16_to_cpu(v);
 117                                b += sizeof(__be16);
 118                                left -= sizeof(__be16);
 119
 120                                /* namelen of 0xFFFFF marks end of names for
 121                                   this node; namelen of 0 marks end of the
 122                                   buffer */
 123
 124                                if (namelen == 0xFFFF)
 125                                        goto done;
 126                                if (!namelen)
 127                                        break;
 128
 129                                if (namelen > left)
 130                                        goto out_free;
 131
 132                                if (namelen > DLM_RESNAME_MAXLEN)
 133                                        goto out_free;
 134
 135                                error = dlm_master_lookup(ls, memb->nodeid,
 136                                                          b, namelen,
 137                                                          DLM_LU_RECOVER_DIR,
 138                                                          &nodeid, &result);
 139                                if (error) {
 140                                        log_error(ls, "recover_dir lookup %d",
 141                                                  error);
 142                                        goto out_free;
 143                                }
 144
 145                                /* The name was found in rsbtbl, but the
 146                                 * master nodeid is different from
 147                                 * memb->nodeid which says it is the master.
 148                                 * This should not happen. */
 149
 150                                if (result == DLM_LU_MATCH &&
 151                                    nodeid != memb->nodeid) {
 152                                        count_bad++;
 153                                        log_error(ls, "recover_dir lookup %d "
 154                                                  "nodeid %d memb %d bad %u",
 155                                                  result, nodeid, memb->nodeid,
 156                                                  count_bad);
 157                                        print_hex_dump_bytes("dlm_recover_dir ",
 158                                                             DUMP_PREFIX_NONE,
 159                                                             b, namelen);
 160                                }
 161
 162                                /* The name was found in rsbtbl, and the
 163                                 * master nodeid matches memb->nodeid. */
 164
 165                                if (result == DLM_LU_MATCH &&
 166                                    nodeid == memb->nodeid) {
 167                                        count_match++;
 168                                }
 169
 170                                /* The name was not found in rsbtbl and was
 171                                 * added with memb->nodeid as the master. */
 172
 173                                if (result == DLM_LU_ADD) {
 174                                        count_add++;
 175                                }
 176
 177                                last_len = namelen;
 178                                memcpy(last_name, b, namelen);
 179                                b += namelen;
 180                                left -= namelen;
 181                                count++;
 182                        }
 183                }
 184         done:
 185                ;
 186        }
 187
 188 out_status:
 189        error = 0;
 190        dlm_set_recover_status(ls, DLM_RS_DIR);
 191
 192        log_rinfo(ls, "dlm_recover_directory %u in %u new",
 193                  count, count_add);
 194 out_free:
 195        kfree(last_name);
 196 out:
 197        return error;
 198}
 199
 200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 201{
 202        struct dlm_rsb *r;
 203        uint32_t hash, bucket;
 204        int rv;
 205
 206        hash = jhash(name, len, 0);
 207        bucket = hash & (ls->ls_rsbtbl_size - 1);
 208
 209        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 210        rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 211        if (rv)
 212                rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
 213                                         name, len, &r);
 214        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 215
 216        if (!rv)
 217                return r;
 218
 219        down_read(&ls->ls_root_sem);
 220        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 221                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 222                        up_read(&ls->ls_root_sem);
 223                        log_debug(ls, "find_rsb_root revert to root_list %s",
 224                                  r->res_name);
 225                        return r;
 226                }
 227        }
 228        up_read(&ls->ls_root_sem);
 229        return NULL;
 230}
 231
 232/* Find the rsb where we left off (or start again), then send rsb names
 233   for rsb's we're master of and whose directory node matches the requesting
 234   node.  inbuf is the rsb name last sent, inlen is the name's length */
 235
 236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 237                           char *outbuf, int outlen, int nodeid)
 238{
 239        struct list_head *list;
 240        struct dlm_rsb *r;
 241        int offset = 0, dir_nodeid;
 242        __be16 be_namelen;
 243
 244        down_read(&ls->ls_root_sem);
 245
 246        if (inlen > 1) {
 247                r = find_rsb_root(ls, inbuf, inlen);
 248                if (!r) {
 249                        inbuf[inlen - 1] = '\0';
 250                        log_error(ls, "copy_master_names from %d start %d %s",
 251                                  nodeid, inlen, inbuf);
 252                        goto out;
 253                }
 254                list = r->res_root_list.next;
 255        } else {
 256                list = ls->ls_root_list.next;
 257        }
 258
 259        for (offset = 0; list != &ls->ls_root_list; list = list->next) {
 260                r = list_entry(list, struct dlm_rsb, res_root_list);
 261                if (r->res_nodeid)
 262                        continue;
 263
 264                dir_nodeid = dlm_dir_nodeid(r);
 265                if (dir_nodeid != nodeid)
 266                        continue;
 267
 268                /*
 269                 * The block ends when we can't fit the following in the
 270                 * remaining buffer space:
 271                 * namelen (uint16_t) +
 272                 * name (r->res_length) +
 273                 * end-of-block record 0x0000 (uint16_t)
 274                 */
 275
 276                if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
 277                        /* Write end-of-block record */
 278                        be_namelen = cpu_to_be16(0);
 279                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 280                        offset += sizeof(__be16);
 281                        ls->ls_recover_dir_sent_msg++;
 282                        goto out;
 283                }
 284
 285                be_namelen = cpu_to_be16(r->res_length);
 286                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 287                offset += sizeof(__be16);
 288                memcpy(outbuf + offset, r->res_name, r->res_length);
 289                offset += r->res_length;
 290                ls->ls_recover_dir_sent_res++;
 291        }
 292
 293        /*
 294         * If we've reached the end of the list (and there's room) write a
 295         * terminating record.
 296         */
 297
 298        if ((list == &ls->ls_root_list) &&
 299            (offset + sizeof(uint16_t) <= outlen)) {
 300                be_namelen = cpu_to_be16(0xFFFF);
 301                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 302                offset += sizeof(__be16);
 303                ls->ls_recover_dir_sent_msg++;
 304        }
 305 out:
 306        up_read(&ls->ls_root_sem);
 307}
 308
 309