linux/fs/dlm/lock.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include <linux/rbtree.h>
  60#include <linux/slab.h>
  61#include "dlm_internal.h"
  62#include <linux/dlm_device.h>
  63#include "memory.h"
  64#include "lowcomms.h"
  65#include "requestqueue.h"
  66#include "util.h"
  67#include "dir.h"
  68#include "member.h"
  69#include "lockspace.h"
  70#include "ast.h"
  71#include "lock.h"
  72#include "rcom.h"
  73#include "recover.h"
  74#include "lvb_table.h"
  75#include "user.h"
  76#include "config.h"
  77
  78static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  85static int send_remove(struct dlm_rsb *r);
  86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  89                                    struct dlm_message *ms);
  90static int receive_extralen(struct dlm_message *ms);
  91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  92static void del_timeout(struct dlm_lkb *lkb);
  93static void toss_rsb(struct kref *kref);
  94
  95/*
  96 * Lock compatibilty matrix - thanks Steve
  97 * UN = Unlocked state. Not really a state, used as a flag
  98 * PD = Padding. Used to make the matrix a nice power of two in size
  99 * Other states are the same as the VMS DLM.
 100 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
 101 */
 102
 103static const int __dlm_compat_matrix[8][8] = {
 104      /* UN NL CR CW PR PW EX PD */
 105        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 106        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 107        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 108        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 109        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 110        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 111        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 112        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 113};
 114
 115/*
 116 * This defines the direction of transfer of LVB data.
 117 * Granted mode is the row; requested mode is the column.
 118 * Usage: matrix[grmode+1][rqmode+1]
 119 * 1 = LVB is returned to the caller
 120 * 0 = LVB is written to the resource
 121 * -1 = nothing happens to the LVB
 122 */
 123
 124const int dlm_lvb_operations[8][8] = {
 125        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 126        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 127        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 128        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 129        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 130        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 131        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 132        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 133        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 134};
 135
 136#define modes_compat(gr, rq) \
 137        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 138
 139int dlm_modes_compat(int mode1, int mode2)
 140{
 141        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 142}
 143
 144/*
 145 * Compatibility matrix for conversions with QUECVT set.
 146 * Granted mode is the row; requested mode is the column.
 147 * Usage: matrix[grmode+1][rqmode+1]
 148 */
 149
 150static const int __quecvt_compat_matrix[8][8] = {
 151      /* UN NL CR CW PR PW EX PD */
 152        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 153        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 154        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 155        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 156        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 157        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 158        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 159        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 160};
 161
 162void dlm_print_lkb(struct dlm_lkb *lkb)
 163{
 164        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
 165               "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
 166               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 167               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 168               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
 169               (unsigned long long)lkb->lkb_recover_seq);
 170}
 171
 172static void dlm_print_rsb(struct dlm_rsb *r)
 173{
 174        printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
 175               "rlc %d name %s\n",
 176               r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
 177               r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
 178               r->res_name);
 179}
 180
 181void dlm_dump_rsb(struct dlm_rsb *r)
 182{
 183        struct dlm_lkb *lkb;
 184
 185        dlm_print_rsb(r);
 186
 187        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 188               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 189        printk(KERN_ERR "rsb lookup list\n");
 190        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 191                dlm_print_lkb(lkb);
 192        printk(KERN_ERR "rsb grant queue:\n");
 193        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 194                dlm_print_lkb(lkb);
 195        printk(KERN_ERR "rsb convert queue:\n");
 196        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 197                dlm_print_lkb(lkb);
 198        printk(KERN_ERR "rsb wait queue:\n");
 199        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 200                dlm_print_lkb(lkb);
 201}
 202
 203/* Threads cannot use the lockspace while it's being recovered */
 204
 205static inline void dlm_lock_recovery(struct dlm_ls *ls)
 206{
 207        down_read(&ls->ls_in_recovery);
 208}
 209
 210void dlm_unlock_recovery(struct dlm_ls *ls)
 211{
 212        up_read(&ls->ls_in_recovery);
 213}
 214
 215int dlm_lock_recovery_try(struct dlm_ls *ls)
 216{
 217        return down_read_trylock(&ls->ls_in_recovery);
 218}
 219
 220static inline int can_be_queued(struct dlm_lkb *lkb)
 221{
 222        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 223}
 224
 225static inline int force_blocking_asts(struct dlm_lkb *lkb)
 226{
 227        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 228}
 229
 230static inline int is_demoted(struct dlm_lkb *lkb)
 231{
 232        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 233}
 234
 235static inline int is_altmode(struct dlm_lkb *lkb)
 236{
 237        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 238}
 239
 240static inline int is_granted(struct dlm_lkb *lkb)
 241{
 242        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 243}
 244
 245static inline int is_remote(struct dlm_rsb *r)
 246{
 247        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 248        return !!r->res_nodeid;
 249}
 250
 251static inline int is_process_copy(struct dlm_lkb *lkb)
 252{
 253        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 254}
 255
 256static inline int is_master_copy(struct dlm_lkb *lkb)
 257{
 258        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 259}
 260
 261static inline int middle_conversion(struct dlm_lkb *lkb)
 262{
 263        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 264            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 265                return 1;
 266        return 0;
 267}
 268
 269static inline int down_conversion(struct dlm_lkb *lkb)
 270{
 271        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 272}
 273
 274static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 275{
 276        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 277}
 278
 279static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 280{
 281        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 282}
 283
 284static inline int is_overlap(struct dlm_lkb *lkb)
 285{
 286        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 287                                  DLM_IFL_OVERLAP_CANCEL));
 288}
 289
 290static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 291{
 292        if (is_master_copy(lkb))
 293                return;
 294
 295        del_timeout(lkb);
 296
 297        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 298
 299        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 300           timeout caused the cancel then return -ETIMEDOUT */
 301        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 302                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 303                rv = -ETIMEDOUT;
 304        }
 305
 306        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 307                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 308                rv = -EDEADLK;
 309        }
 310
 311        dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
 312}
 313
 314static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 315{
 316        queue_cast(r, lkb,
 317                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 318}
 319
 320static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 321{
 322        if (is_master_copy(lkb)) {
 323                send_bast(r, lkb, rqmode);
 324        } else {
 325                dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
 326        }
 327}
 328
 329/*
 330 * Basic operations on rsb's and lkb's
 331 */
 332
 333/* This is only called to add a reference when the code already holds
 334   a valid reference to the rsb, so there's no need for locking. */
 335
 336static inline void hold_rsb(struct dlm_rsb *r)
 337{
 338        kref_get(&r->res_ref);
 339}
 340
 341void dlm_hold_rsb(struct dlm_rsb *r)
 342{
 343        hold_rsb(r);
 344}
 345
 346/* When all references to the rsb are gone it's transferred to
 347   the tossed list for later disposal. */
 348
 349static void put_rsb(struct dlm_rsb *r)
 350{
 351        struct dlm_ls *ls = r->res_ls;
 352        uint32_t bucket = r->res_bucket;
 353
 354        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 355        kref_put(&r->res_ref, toss_rsb);
 356        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 357}
 358
 359void dlm_put_rsb(struct dlm_rsb *r)
 360{
 361        put_rsb(r);
 362}
 363
 364static int pre_rsb_struct(struct dlm_ls *ls)
 365{
 366        struct dlm_rsb *r1, *r2;
 367        int count = 0;
 368
 369        spin_lock(&ls->ls_new_rsb_spin);
 370        if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
 371                spin_unlock(&ls->ls_new_rsb_spin);
 372                return 0;
 373        }
 374        spin_unlock(&ls->ls_new_rsb_spin);
 375
 376        r1 = dlm_allocate_rsb(ls);
 377        r2 = dlm_allocate_rsb(ls);
 378
 379        spin_lock(&ls->ls_new_rsb_spin);
 380        if (r1) {
 381                list_add(&r1->res_hashchain, &ls->ls_new_rsb);
 382                ls->ls_new_rsb_count++;
 383        }
 384        if (r2) {
 385                list_add(&r2->res_hashchain, &ls->ls_new_rsb);
 386                ls->ls_new_rsb_count++;
 387        }
 388        count = ls->ls_new_rsb_count;
 389        spin_unlock(&ls->ls_new_rsb_spin);
 390
 391        if (!count)
 392                return -ENOMEM;
 393        return 0;
 394}
 395
 396/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
 397   unlock any spinlocks, go back and call pre_rsb_struct again.
 398   Otherwise, take an rsb off the list and return it. */
 399
 400static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 401                          struct dlm_rsb **r_ret)
 402{
 403        struct dlm_rsb *r;
 404        int count;
 405
 406        spin_lock(&ls->ls_new_rsb_spin);
 407        if (list_empty(&ls->ls_new_rsb)) {
 408                count = ls->ls_new_rsb_count;
 409                spin_unlock(&ls->ls_new_rsb_spin);
 410                log_debug(ls, "find_rsb retry %d %d %s",
 411                          count, dlm_config.ci_new_rsb_count, name);
 412                return -EAGAIN;
 413        }
 414
 415        r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 416        list_del(&r->res_hashchain);
 417        /* Convert the empty list_head to a NULL rb_node for tree usage: */
 418        memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 419        ls->ls_new_rsb_count--;
 420        spin_unlock(&ls->ls_new_rsb_spin);
 421
 422        r->res_ls = ls;
 423        r->res_length = len;
 424        memcpy(r->res_name, name, len);
 425        mutex_init(&r->res_mutex);
 426
 427        INIT_LIST_HEAD(&r->res_lookup);
 428        INIT_LIST_HEAD(&r->res_grantqueue);
 429        INIT_LIST_HEAD(&r->res_convertqueue);
 430        INIT_LIST_HEAD(&r->res_waitqueue);
 431        INIT_LIST_HEAD(&r->res_root_list);
 432        INIT_LIST_HEAD(&r->res_recover_list);
 433
 434        *r_ret = r;
 435        return 0;
 436}
 437
 438static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
 439{
 440        char maxname[DLM_RESNAME_MAXLEN];
 441
 442        memset(maxname, 0, DLM_RESNAME_MAXLEN);
 443        memcpy(maxname, name, nlen);
 444        return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
 445}
 446
 447int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
 448                        struct dlm_rsb **r_ret)
 449{
 450        struct rb_node *node = tree->rb_node;
 451        struct dlm_rsb *r;
 452        int rc;
 453
 454        while (node) {
 455                r = rb_entry(node, struct dlm_rsb, res_hashnode);
 456                rc = rsb_cmp(r, name, len);
 457                if (rc < 0)
 458                        node = node->rb_left;
 459                else if (rc > 0)
 460                        node = node->rb_right;
 461                else
 462                        goto found;
 463        }
 464        *r_ret = NULL;
 465        return -EBADR;
 466
 467 found:
 468        *r_ret = r;
 469        return 0;
 470}
 471
 472static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
 473{
 474        struct rb_node **newn = &tree->rb_node;
 475        struct rb_node *parent = NULL;
 476        int rc;
 477
 478        while (*newn) {
 479                struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
 480                                               res_hashnode);
 481
 482                parent = *newn;
 483                rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
 484                if (rc < 0)
 485                        newn = &parent->rb_left;
 486                else if (rc > 0)
 487                        newn = &parent->rb_right;
 488                else {
 489                        log_print("rsb_insert match");
 490                        dlm_dump_rsb(rsb);
 491                        dlm_dump_rsb(cur);
 492                        return -EEXIST;
 493                }
 494        }
 495
 496        rb_link_node(&rsb->res_hashnode, parent, newn);
 497        rb_insert_color(&rsb->res_hashnode, tree);
 498        return 0;
 499}
 500
 501/*
 502 * Find rsb in rsbtbl and potentially create/add one
 503 *
 504 * Delaying the release of rsb's has a similar benefit to applications keeping
 505 * NL locks on an rsb, but without the guarantee that the cached master value
 506 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 507 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 508 * to excessive master lookups and removals if we don't delay the release.
 509 *
 510 * Searching for an rsb means looking through both the normal list and toss
 511 * list.  When found on the toss list the rsb is moved to the normal list with
 512 * ref count of 1; when found on normal list the ref count is incremented.
 513 *
 514 * rsb's on the keep list are being used locally and refcounted.
 515 * rsb's on the toss list are not being used locally, and are not refcounted.
 516 *
 517 * The toss list rsb's were either
 518 * - previously used locally but not any more (were on keep list, then
 519 *   moved to toss list when last refcount dropped)
 520 * - created and put on toss list as a directory record for a lookup
 521 *   (we are the dir node for the res, but are not using the res right now,
 522 *   but some other node is)
 523 *
 524 * The purpose of find_rsb() is to return a refcounted rsb for local use.
 525 * So, if the given rsb is on the toss list, it is moved to the keep list
 526 * before being returned.
 527 *
 528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
 529 * more refcounts exist, so the rsb is moved from the keep list to the
 530 * toss list.
 531 *
 532 * rsb's on both keep and toss lists are used for doing a name to master
 533 * lookups.  rsb's that are in use locally (and being refcounted) are on
 534 * the keep list, rsb's that are not in use locally (not refcounted) and
 535 * only exist for name/master lookups are on the toss list.
 536 *
 537 * rsb's on the toss list who's dir_nodeid is not local can have stale
 538 * name/master mappings.  So, remote requests on such rsb's can potentially
 539 * return with an error, which means the mapping is stale and needs to
 540 * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
 541 * first_lkid is to keep only a single outstanding request on an rsb
 542 * while that rsb has a potentially stale master.)
 543 */
 544
 545static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
 546                        uint32_t hash, uint32_t b,
 547                        int dir_nodeid, int from_nodeid,
 548                        unsigned int flags, struct dlm_rsb **r_ret)
 549{
 550        struct dlm_rsb *r = NULL;
 551        int our_nodeid = dlm_our_nodeid();
 552        int from_local = 0;
 553        int from_other = 0;
 554        int from_dir = 0;
 555        int create = 0;
 556        int error;
 557
 558        if (flags & R_RECEIVE_REQUEST) {
 559                if (from_nodeid == dir_nodeid)
 560                        from_dir = 1;
 561                else
 562                        from_other = 1;
 563        } else if (flags & R_REQUEST) {
 564                from_local = 1;
 565        }
 566
 567        /*
 568         * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
 569         * from_nodeid has sent us a lock in dlm_recover_locks, believing
 570         * we're the new master.  Our local recovery may not have set
 571         * res_master_nodeid to our_nodeid yet, so allow either.  Don't
 572         * create the rsb; dlm_recover_process_copy() will handle EBADR
 573         * by resending.
 574         *
 575         * If someone sends us a request, we are the dir node, and we do
 576         * not find the rsb anywhere, then recreate it.  This happens if
 577         * someone sends us a request after we have removed/freed an rsb
 578         * from our toss list.  (They sent a request instead of lookup
 579         * because they are using an rsb from their toss list.)
 580         */
 581
 582        if (from_local || from_dir ||
 583            (from_other && (dir_nodeid == our_nodeid))) {
 584                create = 1;
 585        }
 586
 587 retry:
 588        if (create) {
 589                error = pre_rsb_struct(ls);
 590                if (error < 0)
 591                        goto out;
 592        }
 593
 594        spin_lock(&ls->ls_rsbtbl[b].lock);
 595
 596        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 597        if (error)
 598                goto do_toss;
 599        
 600        /*
 601         * rsb is active, so we can't check master_nodeid without lock_rsb.
 602         */
 603
 604        kref_get(&r->res_ref);
 605        error = 0;
 606        goto out_unlock;
 607
 608
 609 do_toss:
 610        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 611        if (error)
 612                goto do_new;
 613
 614        /*
 615         * rsb found inactive (master_nodeid may be out of date unless
 616         * we are the dir_nodeid or were the master)  No other thread
 617         * is using this rsb because it's on the toss list, so we can
 618         * look at or update res_master_nodeid without lock_rsb.
 619         */
 620
 621        if ((r->res_master_nodeid != our_nodeid) && from_other) {
 622                /* our rsb was not master, and another node (not the dir node)
 623                   has sent us a request */
 624                log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
 625                          from_nodeid, r->res_master_nodeid, dir_nodeid,
 626                          r->res_name);
 627                error = -ENOTBLK;
 628                goto out_unlock;
 629        }
 630
 631        if ((r->res_master_nodeid != our_nodeid) && from_dir) {
 632                /* don't think this should ever happen */
 633                log_error(ls, "find_rsb toss from_dir %d master %d",
 634                          from_nodeid, r->res_master_nodeid);
 635                dlm_print_rsb(r);
 636                /* fix it and go on */
 637                r->res_master_nodeid = our_nodeid;
 638                r->res_nodeid = 0;
 639                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 640                r->res_first_lkid = 0;
 641        }
 642
 643        if (from_local && (r->res_master_nodeid != our_nodeid)) {
 644                /* Because we have held no locks on this rsb,
 645                   res_master_nodeid could have become stale. */
 646                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 647                r->res_first_lkid = 0;
 648        }
 649
 650        rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 651        error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 652        goto out_unlock;
 653
 654
 655 do_new:
 656        /*
 657         * rsb not found
 658         */
 659
 660        if (error == -EBADR && !create)
 661                goto out_unlock;
 662
 663        error = get_rsb_struct(ls, name, len, &r);
 664        if (error == -EAGAIN) {
 665                spin_unlock(&ls->ls_rsbtbl[b].lock);
 666                goto retry;
 667        }
 668        if (error)
 669                goto out_unlock;
 670
 671        r->res_hash = hash;
 672        r->res_bucket = b;
 673        r->res_dir_nodeid = dir_nodeid;
 674        kref_init(&r->res_ref);
 675
 676        if (from_dir) {
 677                /* want to see how often this happens */
 678                log_debug(ls, "find_rsb new from_dir %d recreate %s",
 679                          from_nodeid, r->res_name);
 680                r->res_master_nodeid = our_nodeid;
 681                r->res_nodeid = 0;
 682                goto out_add;
 683        }
 684
 685        if (from_other && (dir_nodeid != our_nodeid)) {
 686                /* should never happen */
 687                log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
 688                          from_nodeid, dir_nodeid, our_nodeid, r->res_name);
 689                dlm_free_rsb(r);
 690                error = -ENOTBLK;
 691                goto out_unlock;
 692        }
 693
 694        if (from_other) {
 695                log_debug(ls, "find_rsb new from_other %d dir %d %s",
 696                          from_nodeid, dir_nodeid, r->res_name);
 697        }
 698
 699        if (dir_nodeid == our_nodeid) {
 700                /* When we are the dir nodeid, we can set the master
 701                   node immediately */
 702                r->res_master_nodeid = our_nodeid;
 703                r->res_nodeid = 0;
 704        } else {
 705                /* set_master will send_lookup to dir_nodeid */
 706                r->res_master_nodeid = 0;
 707                r->res_nodeid = -1;
 708        }
 709
 710 out_add:
 711        error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 712 out_unlock:
 713        spin_unlock(&ls->ls_rsbtbl[b].lock);
 714 out:
 715        *r_ret = r;
 716        return error;
 717}
 718
 719/* During recovery, other nodes can send us new MSTCPY locks (from
 720   dlm_recover_locks) before we've made ourself master (in
 721   dlm_recover_masters). */
 722
 723static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
 724                          uint32_t hash, uint32_t b,
 725                          int dir_nodeid, int from_nodeid,
 726                          unsigned int flags, struct dlm_rsb **r_ret)
 727{
 728        struct dlm_rsb *r = NULL;
 729        int our_nodeid = dlm_our_nodeid();
 730        int recover = (flags & R_RECEIVE_RECOVER);
 731        int error;
 732
 733 retry:
 734        error = pre_rsb_struct(ls);
 735        if (error < 0)
 736                goto out;
 737
 738        spin_lock(&ls->ls_rsbtbl[b].lock);
 739
 740        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 741        if (error)
 742                goto do_toss;
 743
 744        /*
 745         * rsb is active, so we can't check master_nodeid without lock_rsb.
 746         */
 747
 748        kref_get(&r->res_ref);
 749        goto out_unlock;
 750
 751
 752 do_toss:
 753        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 754        if (error)
 755                goto do_new;
 756
 757        /*
 758         * rsb found inactive. No other thread is using this rsb because
 759         * it's on the toss list, so we can look at or update
 760         * res_master_nodeid without lock_rsb.
 761         */
 762
 763        if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
 764                /* our rsb is not master, and another node has sent us a
 765                   request; this should never happen */
 766                log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
 767                          from_nodeid, r->res_master_nodeid, dir_nodeid);
 768                dlm_print_rsb(r);
 769                error = -ENOTBLK;
 770                goto out_unlock;
 771        }
 772
 773        if (!recover && (r->res_master_nodeid != our_nodeid) &&
 774            (dir_nodeid == our_nodeid)) {
 775                /* our rsb is not master, and we are dir; may as well fix it;
 776                   this should never happen */
 777                log_error(ls, "find_rsb toss our %d master %d dir %d",
 778                          our_nodeid, r->res_master_nodeid, dir_nodeid);
 779                dlm_print_rsb(r);
 780                r->res_master_nodeid = our_nodeid;
 781                r->res_nodeid = 0;
 782        }
 783
 784        rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 785        error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 786        goto out_unlock;
 787
 788
 789 do_new:
 790        /*
 791         * rsb not found
 792         */
 793
 794        error = get_rsb_struct(ls, name, len, &r);
 795        if (error == -EAGAIN) {
 796                spin_unlock(&ls->ls_rsbtbl[b].lock);
 797                goto retry;
 798        }
 799        if (error)
 800                goto out_unlock;
 801
 802        r->res_hash = hash;
 803        r->res_bucket = b;
 804        r->res_dir_nodeid = dir_nodeid;
 805        r->res_master_nodeid = dir_nodeid;
 806        r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
 807        kref_init(&r->res_ref);
 808
 809        error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 810 out_unlock:
 811        spin_unlock(&ls->ls_rsbtbl[b].lock);
 812 out:
 813        *r_ret = r;
 814        return error;
 815}
 816
 817static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
 818                    unsigned int flags, struct dlm_rsb **r_ret)
 819{
 820        uint32_t hash, b;
 821        int dir_nodeid;
 822
 823        if (len > DLM_RESNAME_MAXLEN)
 824                return -EINVAL;
 825
 826        hash = jhash(name, len, 0);
 827        b = hash & (ls->ls_rsbtbl_size - 1);
 828
 829        dir_nodeid = dlm_hash2nodeid(ls, hash);
 830
 831        if (dlm_no_directory(ls))
 832                return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
 833                                      from_nodeid, flags, r_ret);
 834        else
 835                return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
 836                                      from_nodeid, flags, r_ret);
 837}
 838
 839/* we have received a request and found that res_master_nodeid != our_nodeid,
 840   so we need to return an error or make ourself the master */
 841
 842static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
 843                                  int from_nodeid)
 844{
 845        if (dlm_no_directory(ls)) {
 846                log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
 847                          from_nodeid, r->res_master_nodeid,
 848                          r->res_dir_nodeid);
 849                dlm_print_rsb(r);
 850                return -ENOTBLK;
 851        }
 852
 853        if (from_nodeid != r->res_dir_nodeid) {
 854                /* our rsb is not master, and another node (not the dir node)
 855                   has sent us a request.  this is much more common when our
 856                   master_nodeid is zero, so limit debug to non-zero.  */
 857
 858                if (r->res_master_nodeid) {
 859                        log_debug(ls, "validate master from_other %d master %d "
 860                                  "dir %d first %x %s", from_nodeid,
 861                                  r->res_master_nodeid, r->res_dir_nodeid,
 862                                  r->res_first_lkid, r->res_name);
 863                }
 864                return -ENOTBLK;
 865        } else {
 866                /* our rsb is not master, but the dir nodeid has sent us a
 867                   request; this could happen with master 0 / res_nodeid -1 */
 868
 869                if (r->res_master_nodeid) {
 870                        log_error(ls, "validate master from_dir %d master %d "
 871                                  "first %x %s",
 872                                  from_nodeid, r->res_master_nodeid,
 873                                  r->res_first_lkid, r->res_name);
 874                }
 875
 876                r->res_master_nodeid = dlm_our_nodeid();
 877                r->res_nodeid = 0;
 878                return 0;
 879        }
 880}
 881
 882/*
 883 * We're the dir node for this res and another node wants to know the
 884 * master nodeid.  During normal operation (non recovery) this is only
 885 * called from receive_lookup(); master lookups when the local node is
 886 * the dir node are done by find_rsb().
 887 *
 888 * normal operation, we are the dir node for a resource
 889 * . _request_lock
 890 * . set_master
 891 * . send_lookup
 892 * . receive_lookup
 893 * . dlm_master_lookup flags 0
 894 *
 895 * recover directory, we are rebuilding dir for all resources
 896 * . dlm_recover_directory
 897 * . dlm_rcom_names
 898 *   remote node sends back the rsb names it is master of and we are dir of
 899 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
 900 *   we either create new rsb setting remote node as master, or find existing
 901 *   rsb and set master to be the remote node.
 902 *
 903 * recover masters, we are finding the new master for resources
 904 * . dlm_recover_masters
 905 * . recover_master
 906 * . dlm_send_rcom_lookup
 907 * . receive_rcom_lookup
 908 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
 909 */
 910
 911int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
 912                      unsigned int flags, int *r_nodeid, int *result)
 913{
 914        struct dlm_rsb *r = NULL;
 915        uint32_t hash, b;
 916        int from_master = (flags & DLM_LU_RECOVER_DIR);
 917        int fix_master = (flags & DLM_LU_RECOVER_MASTER);
 918        int our_nodeid = dlm_our_nodeid();
 919        int dir_nodeid, error, toss_list = 0;
 920
 921        if (len > DLM_RESNAME_MAXLEN)
 922                return -EINVAL;
 923
 924        if (from_nodeid == our_nodeid) {
 925                log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
 926                          our_nodeid, flags);
 927                return -EINVAL;
 928        }
 929
 930        hash = jhash(name, len, 0);
 931        b = hash & (ls->ls_rsbtbl_size - 1);
 932
 933        dir_nodeid = dlm_hash2nodeid(ls, hash);
 934        if (dir_nodeid != our_nodeid) {
 935                log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
 936                          from_nodeid, dir_nodeid, our_nodeid, hash,
 937                          ls->ls_num_nodes);
 938                *r_nodeid = -1;
 939                return -EINVAL;
 940        }
 941
 942 retry:
 943        error = pre_rsb_struct(ls);
 944        if (error < 0)
 945                return error;
 946
 947        spin_lock(&ls->ls_rsbtbl[b].lock);
 948        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 949        if (!error) {
 950                /* because the rsb is active, we need to lock_rsb before
 951                   checking/changing re_master_nodeid */
 952
 953                hold_rsb(r);
 954                spin_unlock(&ls->ls_rsbtbl[b].lock);
 955                lock_rsb(r);
 956                goto found;
 957        }
 958
 959        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 960        if (error)
 961                goto not_found;
 962
 963        /* because the rsb is inactive (on toss list), it's not refcounted
 964           and lock_rsb is not used, but is protected by the rsbtbl lock */
 965
 966        toss_list = 1;
 967 found:
 968        if (r->res_dir_nodeid != our_nodeid) {
 969                /* should not happen, but may as well fix it and carry on */
 970                log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
 971                          r->res_dir_nodeid, our_nodeid, r->res_name);
 972                r->res_dir_nodeid = our_nodeid;
 973        }
 974
 975        if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
 976                /* Recovery uses this function to set a new master when
 977                   the previous master failed.  Setting NEW_MASTER will
 978                   force dlm_recover_masters to call recover_master on this
 979                   rsb even though the res_nodeid is no longer removed. */
 980
 981                r->res_master_nodeid = from_nodeid;
 982                r->res_nodeid = from_nodeid;
 983                rsb_set_flag(r, RSB_NEW_MASTER);
 984
 985                if (toss_list) {
 986                        /* I don't think we should ever find it on toss list. */
 987                        log_error(ls, "dlm_master_lookup fix_master on toss");
 988                        dlm_dump_rsb(r);
 989                }
 990        }
 991
 992        if (from_master && (r->res_master_nodeid != from_nodeid)) {
 993                /* this will happen if from_nodeid became master during
 994                   a previous recovery cycle, and we aborted the previous
 995                   cycle before recovering this master value */
 996
 997                log_limit(ls, "dlm_master_lookup from_master %d "
 998                          "master_nodeid %d res_nodeid %d first %x %s",
 999                          from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                          r->res_first_lkid, r->res_name);
1001
1002                if (r->res_master_nodeid == our_nodeid) {
1003                        log_error(ls, "from_master %d our_master", from_nodeid);
1004                        dlm_dump_rsb(r);
1005                        dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                        goto out_found;
1007                }
1008
1009                r->res_master_nodeid = from_nodeid;
1010                r->res_nodeid = from_nodeid;
1011                rsb_set_flag(r, RSB_NEW_MASTER);
1012        }
1013
1014        if (!r->res_master_nodeid) {
1015                /* this will happen if recovery happens while we're looking
1016                   up the master for this rsb */
1017
1018                log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                          from_nodeid, r->res_first_lkid, r->res_name);
1020                r->res_master_nodeid = from_nodeid;
1021                r->res_nodeid = from_nodeid;
1022        }
1023
1024        if (!from_master && !fix_master &&
1025            (r->res_master_nodeid == from_nodeid)) {
1026                /* this can happen when the master sends remove, the dir node
1027                   finds the rsb on the keep list and ignores the remove,
1028                   and the former master sends a lookup */
1029
1030                log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                          "first %x %s", from_nodeid, flags,
1032                          r->res_first_lkid, r->res_name);
1033        }
1034
1035 out_found:
1036        *r_nodeid = r->res_master_nodeid;
1037        if (result)
1038                *result = DLM_LU_MATCH;
1039
1040        if (toss_list) {
1041                r->res_toss_time = jiffies;
1042                /* the rsb was inactive (on toss list) */
1043                spin_unlock(&ls->ls_rsbtbl[b].lock);
1044        } else {
1045                /* the rsb was active */
1046                unlock_rsb(r);
1047                put_rsb(r);
1048        }
1049        return 0;
1050
1051 not_found:
1052        error = get_rsb_struct(ls, name, len, &r);
1053        if (error == -EAGAIN) {
1054                spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                goto retry;
1056        }
1057        if (error)
1058                goto out_unlock;
1059
1060        r->res_hash = hash;
1061        r->res_bucket = b;
1062        r->res_dir_nodeid = our_nodeid;
1063        r->res_master_nodeid = from_nodeid;
1064        r->res_nodeid = from_nodeid;
1065        kref_init(&r->res_ref);
1066        r->res_toss_time = jiffies;
1067
1068        error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069        if (error) {
1070                /* should never happen */
1071                dlm_free_rsb(r);
1072                spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                goto retry;
1074        }
1075
1076        if (result)
1077                *result = DLM_LU_ADD;
1078        *r_nodeid = from_nodeid;
1079        error = 0;
1080 out_unlock:
1081        spin_unlock(&ls->ls_rsbtbl[b].lock);
1082        return error;
1083}
1084
1085static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086{
1087        struct rb_node *n;
1088        struct dlm_rsb *r;
1089        int i;
1090
1091        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                spin_lock(&ls->ls_rsbtbl[i].lock);
1093                for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                        if (r->res_hash == hash)
1096                                dlm_dump_rsb(r);
1097                }
1098                spin_unlock(&ls->ls_rsbtbl[i].lock);
1099        }
1100}
1101
1102void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103{
1104        struct dlm_rsb *r = NULL;
1105        uint32_t hash, b;
1106        int error;
1107
1108        hash = jhash(name, len, 0);
1109        b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111        spin_lock(&ls->ls_rsbtbl[b].lock);
1112        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113        if (!error)
1114                goto out_dump;
1115
1116        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117        if (error)
1118                goto out;
1119 out_dump:
1120        dlm_dump_rsb(r);
1121 out:
1122        spin_unlock(&ls->ls_rsbtbl[b].lock);
1123}
1124
1125static void toss_rsb(struct kref *kref)
1126{
1127        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128        struct dlm_ls *ls = r->res_ls;
1129
1130        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131        kref_init(&r->res_ref);
1132        rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133        rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134        r->res_toss_time = jiffies;
1135        if (r->res_lvbptr) {
1136                dlm_free_lvb(r->res_lvbptr);
1137                r->res_lvbptr = NULL;
1138        }
1139}
1140
1141/* See comment for unhold_lkb */
1142
1143static void unhold_rsb(struct dlm_rsb *r)
1144{
1145        int rv;
1146        rv = kref_put(&r->res_ref, toss_rsb);
1147        DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148}
1149
1150static void kill_rsb(struct kref *kref)
1151{
1152        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154        /* All work is done after the return from kref_put() so we
1155           can release the write_lock before the remove and free. */
1156
1157        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163}
1164
1165/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166   The rsb must exist as long as any lkb's for it do. */
1167
1168static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169{
1170        hold_rsb(r);
1171        lkb->lkb_resource = r;
1172}
1173
1174static void detach_lkb(struct dlm_lkb *lkb)
1175{
1176        if (lkb->lkb_resource) {
1177                put_rsb(lkb->lkb_resource);
1178                lkb->lkb_resource = NULL;
1179        }
1180}
1181
1182static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183{
1184        struct dlm_lkb *lkb;
1185        int rv, id;
1186
1187        lkb = dlm_allocate_lkb(ls);
1188        if (!lkb)
1189                return -ENOMEM;
1190
1191        lkb->lkb_nodeid = -1;
1192        lkb->lkb_grmode = DLM_LOCK_IV;
1193        kref_init(&lkb->lkb_ref);
1194        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196        INIT_LIST_HEAD(&lkb->lkb_time_list);
1197        INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198        mutex_init(&lkb->lkb_cb_mutex);
1199        INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201 retry:
1202        rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203        if (!rv)
1204                return -ENOMEM;
1205
1206        spin_lock(&ls->ls_lkbidr_spin);
1207        rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208        if (!rv)
1209                lkb->lkb_id = id;
1210        spin_unlock(&ls->ls_lkbidr_spin);
1211
1212        if (rv == -EAGAIN)
1213                goto retry;
1214
1215        if (rv < 0) {
1216                log_error(ls, "create_lkb idr error %d", rv);
1217                return rv;
1218        }
1219
1220        *lkb_ret = lkb;
1221        return 0;
1222}
1223
1224static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225{
1226        struct dlm_lkb *lkb;
1227
1228        spin_lock(&ls->ls_lkbidr_spin);
1229        lkb = idr_find(&ls->ls_lkbidr, lkid);
1230        if (lkb)
1231                kref_get(&lkb->lkb_ref);
1232        spin_unlock(&ls->ls_lkbidr_spin);
1233
1234        *lkb_ret = lkb;
1235        return lkb ? 0 : -ENOENT;
1236}
1237
1238static void kill_lkb(struct kref *kref)
1239{
1240        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242        /* All work is done after the return from kref_put() so we
1243           can release the write_lock before the detach_lkb */
1244
1245        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246}
1247
1248/* __put_lkb() is used when an lkb may not have an rsb attached to
1249   it so we need to provide the lockspace explicitly */
1250
1251static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252{
1253        uint32_t lkid = lkb->lkb_id;
1254
1255        spin_lock(&ls->ls_lkbidr_spin);
1256        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                idr_remove(&ls->ls_lkbidr, lkid);
1258                spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                detach_lkb(lkb);
1261
1262                /* for local/process lkbs, lvbptr points to caller's lksb */
1263                if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                        dlm_free_lvb(lkb->lkb_lvbptr);
1265                dlm_free_lkb(lkb);
1266                return 1;
1267        } else {
1268                spin_unlock(&ls->ls_lkbidr_spin);
1269                return 0;
1270        }
1271}
1272
1273int dlm_put_lkb(struct dlm_lkb *lkb)
1274{
1275        struct dlm_ls *ls;
1276
1277        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280        ls = lkb->lkb_resource->res_ls;
1281        return __put_lkb(ls, lkb);
1282}
1283
1284/* This is only called to add a reference when the code already holds
1285   a valid reference to the lkb, so there's no need for locking. */
1286
1287static inline void hold_lkb(struct dlm_lkb *lkb)
1288{
1289        kref_get(&lkb->lkb_ref);
1290}
1291
1292/* This is called when we need to remove a reference and are certain
1293   it's not the last ref.  e.g. del_lkb is always called between a
1294   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295   put_lkb would work fine, but would involve unnecessary locking */
1296
1297static inline void unhold_lkb(struct dlm_lkb *lkb)
1298{
1299        int rv;
1300        rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302}
1303
1304static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                            int mode)
1306{
1307        struct dlm_lkb *lkb = NULL;
1308
1309        list_for_each_entry(lkb, head, lkb_statequeue)
1310                if (lkb->lkb_rqmode < mode)
1311                        break;
1312
1313        __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314}
1315
1316/* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319{
1320        kref_get(&lkb->lkb_ref);
1321
1322        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324        lkb->lkb_timestamp = ktime_get();
1325
1326        lkb->lkb_status = status;
1327
1328        switch (status) {
1329        case DLM_LKSTS_WAITING:
1330                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                else
1333                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                break;
1335        case DLM_LKSTS_GRANTED:
1336                /* convention says granted locks kept in order of grmode */
1337                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                lkb->lkb_grmode);
1339                break;
1340        case DLM_LKSTS_CONVERT:
1341                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                else
1344                        list_add_tail(&lkb->lkb_statequeue,
1345                                      &r->res_convertqueue);
1346                break;
1347        default:
1348                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349        }
1350}
1351
1352static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353{
1354        lkb->lkb_status = 0;
1355        list_del(&lkb->lkb_statequeue);
1356        unhold_lkb(lkb);
1357}
1358
1359static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360{
1361        hold_lkb(lkb);
1362        del_lkb(r, lkb);
1363        add_lkb(r, lkb, sts);
1364        unhold_lkb(lkb);
1365}
1366
1367static int msg_reply_type(int mstype)
1368{
1369        switch (mstype) {
1370        case DLM_MSG_REQUEST:
1371                return DLM_MSG_REQUEST_REPLY;
1372        case DLM_MSG_CONVERT:
1373                return DLM_MSG_CONVERT_REPLY;
1374        case DLM_MSG_UNLOCK:
1375                return DLM_MSG_UNLOCK_REPLY;
1376        case DLM_MSG_CANCEL:
1377                return DLM_MSG_CANCEL_REPLY;
1378        case DLM_MSG_LOOKUP:
1379                return DLM_MSG_LOOKUP_REPLY;
1380        }
1381        return -1;
1382}
1383
1384static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385{
1386        int i;
1387
1388        for (i = 0; i < num_nodes; i++) {
1389                if (!warned[i]) {
1390                        warned[i] = nodeid;
1391                        return 0;
1392                }
1393                if (warned[i] == nodeid)
1394                        return 1;
1395        }
1396        return 0;
1397}
1398
1399void dlm_scan_waiters(struct dlm_ls *ls)
1400{
1401        struct dlm_lkb *lkb;
1402        ktime_t zero = ktime_set(0, 0);
1403        s64 us;
1404        s64 debug_maxus = 0;
1405        u32 debug_scanned = 0;
1406        u32 debug_expired = 0;
1407        int num_nodes = 0;
1408        int *warned = NULL;
1409
1410        if (!dlm_config.ci_waitwarn_us)
1411                return;
1412
1413        mutex_lock(&ls->ls_waiters_mutex);
1414
1415        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                if (ktime_equal(lkb->lkb_wait_time, zero))
1417                        continue;
1418
1419                debug_scanned++;
1420
1421                us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                if (us < dlm_config.ci_waitwarn_us)
1424                        continue;
1425
1426                lkb->lkb_wait_time = zero;
1427
1428                debug_expired++;
1429                if (us > debug_maxus)
1430                        debug_maxus = us;
1431
1432                if (!num_nodes) {
1433                        num_nodes = ls->ls_num_nodes;
1434                        warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                }
1436                if (!warned)
1437                        continue;
1438                if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                        continue;
1440
1441                log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                          "node %d", lkb->lkb_id, (long long)us,
1443                          dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444        }
1445        mutex_unlock(&ls->ls_waiters_mutex);
1446        kfree(warned);
1447
1448        if (debug_expired)
1449                log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                          debug_scanned, debug_expired,
1451                          dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452}
1453
1454/* add/remove lkb from global waiters list of lkb's waiting for
1455   a reply from a remote node */
1456
1457static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458{
1459        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460        int error = 0;
1461
1462        mutex_lock(&ls->ls_waiters_mutex);
1463
1464        if (is_overlap_unlock(lkb) ||
1465            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                error = -EINVAL;
1467                goto out;
1468        }
1469
1470        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                switch (mstype) {
1472                case DLM_MSG_UNLOCK:
1473                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                        break;
1475                case DLM_MSG_CANCEL:
1476                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                        break;
1478                default:
1479                        error = -EBUSY;
1480                        goto out;
1481                }
1482                lkb->lkb_wait_count++;
1483                hold_lkb(lkb);
1484
1485                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                          lkb->lkb_wait_count, lkb->lkb_flags);
1488                goto out;
1489        }
1490
1491        DLM_ASSERT(!lkb->lkb_wait_count,
1492                   dlm_print_lkb(lkb);
1493                   printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495        lkb->lkb_wait_count++;
1496        lkb->lkb_wait_type = mstype;
1497        lkb->lkb_wait_time = ktime_get();
1498        lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499        hold_lkb(lkb);
1500        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501 out:
1502        if (error)
1503                log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506        mutex_unlock(&ls->ls_waiters_mutex);
1507        return error;
1508}
1509
1510/* We clear the RESEND flag because we might be taking an lkb off the waiters
1511   list as part of process_requestqueue (e.g. a lookup that has an optimized
1512   request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513   set RESEND and dlm_recover_waiters_post() */
1514
1515static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                struct dlm_message *ms)
1517{
1518        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519        int overlap_done = 0;
1520
1521        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                overlap_done = 1;
1525                goto out_del;
1526        }
1527
1528        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                overlap_done = 1;
1532                goto out_del;
1533        }
1534
1535        /* Cancel state was preemptively cleared by a successful convert,
1536           see next comment, nothing to do. */
1537
1538        if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539            (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                          lkb->lkb_id, lkb->lkb_wait_type);
1542                return -1;
1543        }
1544
1545        /* Remove for the convert reply, and premptively remove for the
1546           cancel reply.  A convert has been granted while there's still
1547           an outstanding cancel on it (the cancel is moot and the result
1548           in the cancel reply should be 0).  We preempt the cancel reply
1549           because the app gets the convert result and then can follow up
1550           with another op, like convert.  This subsequent op would see the
1551           lingering state of the cancel and fail with -EBUSY. */
1552
1553        if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554            (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555            is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                          lkb->lkb_id);
1558                lkb->lkb_wait_type = 0;
1559                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                lkb->lkb_wait_count--;
1561                goto out_del;
1562        }
1563
1564        /* N.B. type of reply may not always correspond to type of original
1565           msg due to lookup->request optimization, verify others? */
1566
1567        if (lkb->lkb_wait_type) {
1568                lkb->lkb_wait_type = 0;
1569                goto out_del;
1570        }
1571
1572        log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                  lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                  mstype, lkb->lkb_flags);
1575        return -1;
1576
1577 out_del:
1578        /* the force-unlock/cancel has completed and we haven't recvd a reply
1579           to the op that was in progress prior to the unlock/cancel; we
1580           give up on any reply to the earlier op.  FIXME: not sure when/how
1581           this would happen */
1582
1583        if (overlap_done && lkb->lkb_wait_type) {
1584                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                lkb->lkb_wait_count--;
1587                lkb->lkb_wait_type = 0;
1588        }
1589
1590        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592        lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593        lkb->lkb_wait_count--;
1594        if (!lkb->lkb_wait_count)
1595                list_del_init(&lkb->lkb_wait_reply);
1596        unhold_lkb(lkb);
1597        return 0;
1598}
1599
1600static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601{
1602        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603        int error;
1604
1605        mutex_lock(&ls->ls_waiters_mutex);
1606        error = _remove_from_waiters(lkb, mstype, NULL);
1607        mutex_unlock(&ls->ls_waiters_mutex);
1608        return error;
1609}
1610
1611/* Handles situations where we might be processing a "fake" or "stub" reply in
1612   which we can't try to take waiters_mutex again. */
1613
1614static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615{
1616        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617        int error;
1618
1619        if (ms->m_flags != DLM_IFL_STUB_MS)
1620                mutex_lock(&ls->ls_waiters_mutex);
1621        error = _remove_from_waiters(lkb, ms->m_type, ms);
1622        if (ms->m_flags != DLM_IFL_STUB_MS)
1623                mutex_unlock(&ls->ls_waiters_mutex);
1624        return error;
1625}
1626
1627/* If there's an rsb for the same resource being removed, ensure
1628   that the remove message is sent before the new lookup message.
1629   It should be rare to need a delay here, but if not, then it may
1630   be worthwhile to add a proper wait mechanism rather than a delay. */
1631
1632static void wait_pending_remove(struct dlm_rsb *r)
1633{
1634        struct dlm_ls *ls = r->res_ls;
1635 restart:
1636        spin_lock(&ls->ls_remove_spin);
1637        if (ls->ls_remove_len &&
1638            !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639                log_debug(ls, "delay lookup for remove dir %d %s",
1640                          r->res_dir_nodeid, r->res_name);
1641                spin_unlock(&ls->ls_remove_spin);
1642                msleep(1);
1643                goto restart;
1644        }
1645        spin_unlock(&ls->ls_remove_spin);
1646}
1647
1648/*
1649 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650 * read by other threads in wait_pending_remove.  ls_remove_names
1651 * and ls_remove_lens are only used by the scan thread, so they do
1652 * not need protection.
1653 */
1654
1655static void shrink_bucket(struct dlm_ls *ls, int b)
1656{
1657        struct rb_node *n, *next;
1658        struct dlm_rsb *r;
1659        char *name;
1660        int our_nodeid = dlm_our_nodeid();
1661        int remote_count = 0;
1662        int i, len, rv;
1663
1664        memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666        spin_lock(&ls->ls_rsbtbl[b].lock);
1667        for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668                next = rb_next(n);
1669                r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671                /* If we're the directory record for this rsb, and
1672                   we're not the master of it, then we need to wait
1673                   for the master node to send us a dir remove for
1674                   before removing the dir record. */
1675
1676                if (!dlm_no_directory(ls) &&
1677                    (r->res_master_nodeid != our_nodeid) &&
1678                    (dlm_dir_nodeid(r) == our_nodeid)) {
1679                        continue;
1680                }
1681
1682                if (!time_after_eq(jiffies, r->res_toss_time +
1683                                   dlm_config.ci_toss_secs * HZ)) {
1684                        continue;
1685                }
1686
1687                if (!dlm_no_directory(ls) &&
1688                    (r->res_master_nodeid == our_nodeid) &&
1689                    (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691                        /* We're the master of this rsb but we're not
1692                           the directory record, so we need to tell the
1693                           dir node to remove the dir record. */
1694
1695                        ls->ls_remove_lens[remote_count] = r->res_length;
1696                        memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697                               DLM_RESNAME_MAXLEN);
1698                        remote_count++;
1699
1700                        if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701                                break;
1702                        continue;
1703                }
1704
1705                if (!kref_put(&r->res_ref, kill_rsb)) {
1706                        log_error(ls, "tossed rsb in use %s", r->res_name);
1707                        continue;
1708                }
1709
1710                rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711                dlm_free_rsb(r);
1712        }
1713        spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715        /*
1716         * While searching for rsb's to free, we found some that require
1717         * remote removal.  We leave them in place and find them again here
1718         * so there is a very small gap between removing them from the toss
1719         * list and sending the removal.  Keeping this gap small is
1720         * important to keep us (the master node) from being out of sync
1721         * with the remote dir node for very long.
1722         *
1723         * From the time the rsb is removed from toss until just after
1724         * send_remove, the rsb name is saved in ls_remove_name.  A new
1725         * lookup checks this to ensure that a new lookup message for the
1726         * same resource name is not sent just before the remove message.
1727         */
1728
1729        for (i = 0; i < remote_count; i++) {
1730                name = ls->ls_remove_names[i];
1731                len = ls->ls_remove_lens[i];
1732
1733                spin_lock(&ls->ls_rsbtbl[b].lock);
1734                rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735                if (rv) {
1736                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1737                        log_debug(ls, "remove_name not toss %s", name);
1738                        continue;
1739                }
1740
1741                if (r->res_master_nodeid != our_nodeid) {
1742                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1743                        log_debug(ls, "remove_name master %d dir %d our %d %s",
1744                                  r->res_master_nodeid, r->res_dir_nodeid,
1745                                  our_nodeid, name);
1746                        continue;
1747                }
1748
1749                if (r->res_dir_nodeid == our_nodeid) {
1750                        /* should never happen */
1751                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                        log_error(ls, "remove_name dir %d master %d our %d %s",
1753                                  r->res_dir_nodeid, r->res_master_nodeid,
1754                                  our_nodeid, name);
1755                        continue;
1756                }
1757
1758                if (!time_after_eq(jiffies, r->res_toss_time +
1759                                   dlm_config.ci_toss_secs * HZ)) {
1760                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                        log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762                                  r->res_toss_time, jiffies, name);
1763                        continue;
1764                }
1765
1766                if (!kref_put(&r->res_ref, kill_rsb)) {
1767                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1768                        log_error(ls, "remove_name in use %s", name);
1769                        continue;
1770                }
1771
1772                rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774                /* block lookup of same name until we've sent remove */
1775                spin_lock(&ls->ls_remove_spin);
1776                ls->ls_remove_len = len;
1777                memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778                spin_unlock(&ls->ls_remove_spin);
1779                spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781                send_remove(r);
1782
1783                /* allow lookup of name again */
1784                spin_lock(&ls->ls_remove_spin);
1785                ls->ls_remove_len = 0;
1786                memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787                spin_unlock(&ls->ls_remove_spin);
1788
1789                dlm_free_rsb(r);
1790        }
1791}
1792
1793void dlm_scan_rsbs(struct dlm_ls *ls)
1794{
1795        int i;
1796
1797        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798                shrink_bucket(ls, i);
1799                if (dlm_locking_stopped(ls))
1800                        break;
1801                cond_resched();
1802        }
1803}
1804
1805static void add_timeout(struct dlm_lkb *lkb)
1806{
1807        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808
1809        if (is_master_copy(lkb))
1810                return;
1811
1812        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1815                goto add_it;
1816        }
1817        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818                goto add_it;
1819        return;
1820
1821 add_it:
1822        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823        mutex_lock(&ls->ls_timeout_mutex);
1824        hold_lkb(lkb);
1825        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826        mutex_unlock(&ls->ls_timeout_mutex);
1827}
1828
1829static void del_timeout(struct dlm_lkb *lkb)
1830{
1831        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832
1833        mutex_lock(&ls->ls_timeout_mutex);
1834        if (!list_empty(&lkb->lkb_time_list)) {
1835                list_del_init(&lkb->lkb_time_list);
1836                unhold_lkb(lkb);
1837        }
1838        mutex_unlock(&ls->ls_timeout_mutex);
1839}
1840
1841/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1843   and then lock rsb because of lock ordering in add_timeout.  We may need
1844   to specify some special timeout-related bits in the lkb that are just to
1845   be accessed under the timeout_mutex. */
1846
1847void dlm_scan_timeout(struct dlm_ls *ls)
1848{
1849        struct dlm_rsb *r;
1850        struct dlm_lkb *lkb;
1851        int do_cancel, do_warn;
1852        s64 wait_us;
1853
1854        for (;;) {
1855                if (dlm_locking_stopped(ls))
1856                        break;
1857
1858                do_cancel = 0;
1859                do_warn = 0;
1860                mutex_lock(&ls->ls_timeout_mutex);
1861                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1862
1863                        wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864                                                        lkb->lkb_timestamp));
1865
1866                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867                            wait_us >= (lkb->lkb_timeout_cs * 10000))
1868                                do_cancel = 1;
1869
1870                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871                            wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872                                do_warn = 1;
1873
1874                        if (!do_cancel && !do_warn)
1875                                continue;
1876                        hold_lkb(lkb);
1877                        break;
1878                }
1879                mutex_unlock(&ls->ls_timeout_mutex);
1880
1881                if (!do_cancel && !do_warn)
1882                        break;
1883
1884                r = lkb->lkb_resource;
1885                hold_rsb(r);
1886                lock_rsb(r);
1887
1888                if (do_warn) {
1889                        /* clear flag so we only warn once */
1890                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892                                del_timeout(lkb);
1893                        dlm_timeout_warn(lkb);
1894                }
1895
1896                if (do_cancel) {
1897                        log_debug(ls, "timeout cancel %x node %d %s",
1898                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1901                        del_timeout(lkb);
1902                        _cancel_lock(r, lkb);
1903                }
1904
1905                unlock_rsb(r);
1906                unhold_rsb(r);
1907                dlm_put_lkb(lkb);
1908        }
1909}
1910
1911/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912   dlm_recoverd before checking/setting ls_recover_begin. */
1913
1914void dlm_adjust_timeouts(struct dlm_ls *ls)
1915{
1916        struct dlm_lkb *lkb;
1917        u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918
1919        ls->ls_recover_begin = 0;
1920        mutex_lock(&ls->ls_timeout_mutex);
1921        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923        mutex_unlock(&ls->ls_timeout_mutex);
1924
1925        if (!dlm_config.ci_waitwarn_us)
1926                return;
1927
1928        mutex_lock(&ls->ls_waiters_mutex);
1929        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930                if (ktime_to_us(lkb->lkb_wait_time))
1931                        lkb->lkb_wait_time = ktime_get();
1932        }
1933        mutex_unlock(&ls->ls_waiters_mutex);
1934}
1935
1936/* lkb is master or local copy */
1937
1938static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939{
1940        int b, len = r->res_ls->ls_lvblen;
1941
1942        /* b=1 lvb returned to caller
1943           b=0 lvb written to rsb or invalidated
1944           b=-1 do nothing */
1945
1946        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947
1948        if (b == 1) {
1949                if (!lkb->lkb_lvbptr)
1950                        return;
1951
1952                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953                        return;
1954
1955                if (!r->res_lvbptr)
1956                        return;
1957
1958                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959                lkb->lkb_lvbseq = r->res_lvbseq;
1960
1961        } else if (b == 0) {
1962                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963                        rsb_set_flag(r, RSB_VALNOTVALID);
1964                        return;
1965                }
1966
1967                if (!lkb->lkb_lvbptr)
1968                        return;
1969
1970                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971                        return;
1972
1973                if (!r->res_lvbptr)
1974                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1975
1976                if (!r->res_lvbptr)
1977                        return;
1978
1979                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980                r->res_lvbseq++;
1981                lkb->lkb_lvbseq = r->res_lvbseq;
1982                rsb_clear_flag(r, RSB_VALNOTVALID);
1983        }
1984
1985        if (rsb_flag(r, RSB_VALNOTVALID))
1986                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1987}
1988
1989static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990{
1991        if (lkb->lkb_grmode < DLM_LOCK_PW)
1992                return;
1993
1994        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995                rsb_set_flag(r, RSB_VALNOTVALID);
1996                return;
1997        }
1998
1999        if (!lkb->lkb_lvbptr)
2000                return;
2001
2002        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003                return;
2004
2005        if (!r->res_lvbptr)
2006                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2007
2008        if (!r->res_lvbptr)
2009                return;
2010
2011        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012        r->res_lvbseq++;
2013        rsb_clear_flag(r, RSB_VALNOTVALID);
2014}
2015
2016/* lkb is process copy (pc) */
2017
2018static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019                            struct dlm_message *ms)
2020{
2021        int b;
2022
2023        if (!lkb->lkb_lvbptr)
2024                return;
2025
2026        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027                return;
2028
2029        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030        if (b == 1) {
2031                int len = receive_extralen(ms);
2032                if (len > DLM_RESNAME_MAXLEN)
2033                        len = DLM_RESNAME_MAXLEN;
2034                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035                lkb->lkb_lvbseq = ms->m_lvbseq;
2036        }
2037}
2038
2039/* Manipulate lkb's on rsb's convert/granted/waiting queues
2040   remove_lock -- used for unlock, removes lkb from granted
2041   revert_lock -- used for cancel, moves lkb from convert to granted
2042   grant_lock  -- used for request and convert, adds lkb to granted or
2043                  moves lkb from convert or waiting to granted
2044
2045   Each of these is used for master or local copy lkb's.  There is
2046   also a _pc() variation used to make the corresponding change on
2047   a process copy (pc) lkb. */
2048
2049static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050{
2051        del_lkb(r, lkb);
2052        lkb->lkb_grmode = DLM_LOCK_IV;
2053        /* this unhold undoes the original ref from create_lkb()
2054           so this leads to the lkb being freed */
2055        unhold_lkb(lkb);
2056}
2057
2058static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059{
2060        set_lvb_unlock(r, lkb);
2061        _remove_lock(r, lkb);
2062}
2063
2064static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065{
2066        _remove_lock(r, lkb);
2067}
2068
2069/* returns: 0 did nothing
2070            1 moved lock to granted
2071           -1 removed lock */
2072
2073static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074{
2075        int rv = 0;
2076
2077        lkb->lkb_rqmode = DLM_LOCK_IV;
2078
2079        switch (lkb->lkb_status) {
2080        case DLM_LKSTS_GRANTED:
2081                break;
2082        case DLM_LKSTS_CONVERT:
2083                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084                rv = 1;
2085                break;
2086        case DLM_LKSTS_WAITING:
2087                del_lkb(r, lkb);
2088                lkb->lkb_grmode = DLM_LOCK_IV;
2089                /* this unhold undoes the original ref from create_lkb()
2090                   so this leads to the lkb being freed */
2091                unhold_lkb(lkb);
2092                rv = -1;
2093                break;
2094        default:
2095                log_print("invalid status for revert %d", lkb->lkb_status);
2096        }
2097        return rv;
2098}
2099
2100static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101{
2102        return revert_lock(r, lkb);
2103}
2104
2105static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106{
2107        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108                lkb->lkb_grmode = lkb->lkb_rqmode;
2109                if (lkb->lkb_status)
2110                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111                else
2112                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113        }
2114
2115        lkb->lkb_rqmode = DLM_LOCK_IV;
2116        lkb->lkb_highbast = 0;
2117}
2118
2119static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120{
2121        set_lvb_lock(r, lkb);
2122        _grant_lock(r, lkb);
2123}
2124
2125static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                          struct dlm_message *ms)
2127{
2128        set_lvb_lock_pc(r, lkb, ms);
2129        _grant_lock(r, lkb);
2130}
2131
2132/* called by grant_pending_locks() which means an async grant message must
2133   be sent to the requesting node in addition to granting the lock if the
2134   lkb belongs to a remote node. */
2135
2136static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137{
2138        grant_lock(r, lkb);
2139        if (is_master_copy(lkb))
2140                send_grant(r, lkb);
2141        else
2142                queue_cast(r, lkb, 0);
2143}
2144
2145/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146   change the granted/requested modes.  We're munging things accordingly in
2147   the process copy.
2148   CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149   conversion deadlock
2150   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151   compatible with other granted locks */
2152
2153static void munge_demoted(struct dlm_lkb *lkb)
2154{
2155        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156                log_print("munge_demoted %x invalid modes gr %d rq %d",
2157                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158                return;
2159        }
2160
2161        lkb->lkb_grmode = DLM_LOCK_NL;
2162}
2163
2164static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165{
2166        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167            ms->m_type != DLM_MSG_GRANT) {
2168                log_print("munge_altmode %x invalid reply type %d",
2169                          lkb->lkb_id, ms->m_type);
2170                return;
2171        }
2172
2173        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174                lkb->lkb_rqmode = DLM_LOCK_PR;
2175        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176                lkb->lkb_rqmode = DLM_LOCK_CW;
2177        else {
2178                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179                dlm_print_lkb(lkb);
2180        }
2181}
2182
2183static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184{
2185        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186                                           lkb_statequeue);
2187        if (lkb->lkb_id == first->lkb_id)
2188                return 1;
2189
2190        return 0;
2191}
2192
2193/* Check if the given lkb conflicts with another lkb on the queue. */
2194
2195static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196{
2197        struct dlm_lkb *this;
2198
2199        list_for_each_entry(this, head, lkb_statequeue) {
2200                if (this == lkb)
2201                        continue;
2202                if (!modes_compat(this, lkb))
2203                        return 1;
2204        }
2205        return 0;
2206}
2207
2208/*
2209 * "A conversion deadlock arises with a pair of lock requests in the converting
2210 * queue for one resource.  The granted mode of each lock blocks the requested
2211 * mode of the other lock."
2212 *
2213 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214 * convert queue from being granted, then deadlk/demote lkb.
2215 *
2216 * Example:
2217 * Granted Queue: empty
2218 * Convert Queue: NL->EX (first lock)
2219 *                PR->EX (second lock)
2220 *
2221 * The first lock can't be granted because of the granted mode of the second
2222 * lock and the second lock can't be granted because it's not first in the
2223 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225 * flag set and return DEMOTED in the lksb flags.
2226 *
2227 * Originally, this function detected conv-deadlk in a more limited scope:
2228 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229 * - if lkb1 was the first entry in the queue (not just earlier), and was
2230 *   blocked by the granted mode of lkb2, and there was nothing on the
2231 *   granted queue preventing lkb1 from being granted immediately, i.e.
2232 *   lkb2 was the only thing preventing lkb1 from being granted.
2233 *
2234 * That second condition meant we'd only say there was conv-deadlk if
2235 * resolving it (by demotion) would lead to the first lock on the convert
2236 * queue being granted right away.  It allowed conversion deadlocks to exist
2237 * between locks on the convert queue while they couldn't be granted anyway.
2238 *
2239 * Now, we detect and take action on conversion deadlocks immediately when
2240 * they're created, even if they may not be immediately consequential.  If
2241 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242 * mode that would prevent lkb1's conversion from being granted, we do a
2243 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244 * I think this means that the lkb_is_ahead condition below should always
2245 * be zero, i.e. there will never be conv-deadlk between two locks that are
2246 * both already on the convert queue.
2247 */
2248
2249static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250{
2251        struct dlm_lkb *lkb1;
2252        int lkb_is_ahead = 0;
2253
2254        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255                if (lkb1 == lkb2) {
2256                        lkb_is_ahead = 1;
2257                        continue;
2258                }
2259
2260                if (!lkb_is_ahead) {
2261                        if (!modes_compat(lkb2, lkb1))
2262                                return 1;
2263                } else {
2264                        if (!modes_compat(lkb2, lkb1) &&
2265                            !modes_compat(lkb1, lkb2))
2266                                return 1;
2267                }
2268        }
2269        return 0;
2270}
2271
2272/*
2273 * Return 1 if the lock can be granted, 0 otherwise.
2274 * Also detect and resolve conversion deadlocks.
2275 *
2276 * lkb is the lock to be granted
2277 *
2278 * now is 1 if the function is being called in the context of the
2279 * immediate request, it is 0 if called later, after the lock has been
2280 * queued.
2281 *
2282 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283 * after recovery.
2284 *
2285 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2286 */
2287
2288static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289                           int recover)
2290{
2291        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2292
2293        /*
2294         * 6-10: Version 5.4 introduced an option to address the phenomenon of
2295         * a new request for a NL mode lock being blocked.
2296         *
2297         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2298         * request, then it would be granted.  In essence, the use of this flag
2299         * tells the Lock Manager to expedite theis request by not considering
2300         * what may be in the CONVERTING or WAITING queues...  As of this
2301         * writing, the EXPEDITE flag can be used only with new requests for NL
2302         * mode locks.  This flag is not valid for conversion requests.
2303         *
2304         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2305         * conversion or used with a non-NL requested mode.  We also know an
2306         * EXPEDITE request is always granted immediately, so now must always
2307         * be 1.  The full condition to grant an expedite request: (now &&
2308         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2309         * therefore be shortened to just checking the flag.
2310         */
2311
2312        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2313                return 1;
2314
2315        /*
2316         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2317         * added to the remaining conditions.
2318         */
2319
2320        if (queue_conflict(&r->res_grantqueue, lkb))
2321                return 0;
2322
2323        /*
2324         * 6-3: By default, a conversion request is immediately granted if the
2325         * requested mode is compatible with the modes of all other granted
2326         * locks
2327         */
2328
2329        if (queue_conflict(&r->res_convertqueue, lkb))
2330                return 0;
2331
2332        /*
2333         * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334         * locks for a recovered rsb, on which lkb's have been rebuilt.
2335         * The lkb's may have been rebuilt on the queues in a different
2336         * order than they were in on the previous master.  So, granting
2337         * queued conversions in order after recovery doesn't make sense
2338         * since the order hasn't been preserved anyway.  The new order
2339         * could also have created a new "in place" conversion deadlock.
2340         * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341         * After recovery, there would be no granted locks, and possibly
2342         * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2343         * recovery, grant conversions without considering order.
2344         */
2345
2346        if (conv && recover)
2347                return 1;
2348
2349        /*
2350         * 6-5: But the default algorithm for deciding whether to grant or
2351         * queue conversion requests does not by itself guarantee that such
2352         * requests are serviced on a "first come first serve" basis.  This, in
2353         * turn, can lead to a phenomenon known as "indefinate postponement".
2354         *
2355         * 6-7: This issue is dealt with by using the optional QUECVT flag with
2356         * the system service employed to request a lock conversion.  This flag
2357         * forces certain conversion requests to be queued, even if they are
2358         * compatible with the granted modes of other locks on the same
2359         * resource.  Thus, the use of this flag results in conversion requests
2360         * being ordered on a "first come first servce" basis.
2361         *
2362         * DCT: This condition is all about new conversions being able to occur
2363         * "in place" while the lock remains on the granted queue (assuming
2364         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2365         * doesn't _have_ to go onto the convert queue where it's processed in
2366         * order.  The "now" variable is necessary to distinguish converts
2367         * being received and processed for the first time now, because once a
2368         * convert is moved to the conversion queue the condition below applies
2369         * requiring fifo granting.
2370         */
2371
2372        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2373                return 1;
2374
2375        /*
2376         * Even if the convert is compat with all granted locks,
2377         * QUECVT forces it behind other locks on the convert queue.
2378         */
2379
2380        if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2381                if (list_empty(&r->res_convertqueue))
2382                        return 1;
2383                else
2384                        return 0;
2385        }
2386
2387        /*
2388         * The NOORDER flag is set to avoid the standard vms rules on grant
2389         * order.
2390         */
2391
2392        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2393                return 1;
2394
2395        /*
2396         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2397         * granted until all other conversion requests ahead of it are granted
2398         * and/or canceled.
2399         */
2400
2401        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2402                return 1;
2403
2404        /*
2405         * 6-4: By default, a new request is immediately granted only if all
2406         * three of the following conditions are satisfied when the request is
2407         * issued:
2408         * - The queue of ungranted conversion requests for the resource is
2409         *   empty.
2410         * - The queue of ungranted new requests for the resource is empty.
2411         * - The mode of the new request is compatible with the most
2412         *   restrictive mode of all granted locks on the resource.
2413         */
2414
2415        if (now && !conv && list_empty(&r->res_convertqueue) &&
2416            list_empty(&r->res_waitqueue))
2417                return 1;
2418
2419        /*
2420         * 6-4: Once a lock request is in the queue of ungranted new requests,
2421         * it cannot be granted until the queue of ungranted conversion
2422         * requests is empty, all ungranted new requests ahead of it are
2423         * granted and/or canceled, and it is compatible with the granted mode
2424         * of the most restrictive lock granted on the resource.
2425         */
2426
2427        if (!now && !conv && list_empty(&r->res_convertqueue) &&
2428            first_in_list(lkb, &r->res_waitqueue))
2429                return 1;
2430
2431        return 0;
2432}
2433
2434static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2435                          int recover, int *err)
2436{
2437        int rv;
2438        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2439        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2440
2441        if (err)
2442                *err = 0;
2443
2444        rv = _can_be_granted(r, lkb, now, recover);
2445        if (rv)
2446                goto out;
2447
2448        /*
2449         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2450         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2451         * cancels one of the locks.
2452         */
2453
2454        if (is_convert && can_be_queued(lkb) &&
2455            conversion_deadlock_detect(r, lkb)) {
2456                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2457                        lkb->lkb_grmode = DLM_LOCK_NL;
2458                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2459                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2460                        if (err)
2461                                *err = -EDEADLK;
2462                        else {
2463                                log_print("can_be_granted deadlock %x now %d",
2464                                          lkb->lkb_id, now);
2465                                dlm_dump_rsb(r);
2466                        }
2467                }
2468                goto out;
2469        }
2470
2471        /*
2472         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2473         * to grant a request in a mode other than the normal rqmode.  It's a
2474         * simple way to provide a big optimization to applications that can
2475         * use them.
2476         */
2477
2478        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2479                alt = DLM_LOCK_PR;
2480        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2481                alt = DLM_LOCK_CW;
2482
2483        if (alt) {
2484                lkb->lkb_rqmode = alt;
2485                rv = _can_be_granted(r, lkb, now, 0);
2486                if (rv)
2487                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2488                else
2489                        lkb->lkb_rqmode = rqmode;
2490        }
2491 out:
2492        return rv;
2493}
2494
2495/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2496   for locks pending on the convert list.  Once verified (watch for these
2497   log_prints), we should be able to just call _can_be_granted() and not
2498   bother with the demote/deadlk cases here (and there's no easy way to deal
2499   with a deadlk here, we'd have to generate something like grant_lock with
2500   the deadlk error.) */
2501
2502/* Returns the highest requested mode of all blocked conversions; sets
2503   cw if there's a blocked conversion to DLM_LOCK_CW. */
2504
2505static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2506                                 unsigned int *count)
2507{
2508        struct dlm_lkb *lkb, *s;
2509        int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2510        int hi, demoted, quit, grant_restart, demote_restart;
2511        int deadlk;
2512
2513        quit = 0;
2514 restart:
2515        grant_restart = 0;
2516        demote_restart = 0;
2517        hi = DLM_LOCK_IV;
2518
2519        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2520                demoted = is_demoted(lkb);
2521                deadlk = 0;
2522
2523                if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524                        grant_lock_pending(r, lkb);
2525                        grant_restart = 1;
2526                        if (count)
2527                                (*count)++;
2528                        continue;
2529                }
2530
2531                if (!demoted && is_demoted(lkb)) {
2532                        log_print("WARN: pending demoted %x node %d %s",
2533                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2534                        demote_restart = 1;
2535                        continue;
2536                }
2537
2538                if (deadlk) {
2539                        log_print("WARN: pending deadlock %x node %d %s",
2540                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2541                        dlm_dump_rsb(r);
2542                        continue;
2543                }
2544
2545                hi = max_t(int, lkb->lkb_rqmode, hi);
2546
2547                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2548                        *cw = 1;
2549        }
2550
2551        if (grant_restart)
2552                goto restart;
2553        if (demote_restart && !quit) {
2554                quit = 1;
2555                goto restart;
2556        }
2557
2558        return max_t(int, high, hi);
2559}
2560
2561static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2562                              unsigned int *count)
2563{
2564        struct dlm_lkb *lkb, *s;
2565
2566        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2567                if (can_be_granted(r, lkb, 0, 0, NULL)) {
2568                        grant_lock_pending(r, lkb);
2569                        if (count)
2570                                (*count)++;
2571                } else {
2572                        high = max_t(int, lkb->lkb_rqmode, high);
2573                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
2574                                *cw = 1;
2575                }
2576        }
2577
2578        return high;
2579}
2580
2581/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2582   on either the convert or waiting queue.
2583   high is the largest rqmode of all locks blocked on the convert or
2584   waiting queue. */
2585
2586static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2587{
2588        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2589                if (gr->lkb_highbast < DLM_LOCK_EX)
2590                        return 1;
2591                return 0;
2592        }
2593
2594        if (gr->lkb_highbast < high &&
2595            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2596                return 1;
2597        return 0;
2598}
2599
2600static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2601{
2602        struct dlm_lkb *lkb, *s;
2603        int high = DLM_LOCK_IV;
2604        int cw = 0;
2605
2606        if (!is_master(r)) {
2607                log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2608                dlm_dump_rsb(r);
2609                return;
2610        }
2611
2612        high = grant_pending_convert(r, high, &cw, count);
2613        high = grant_pending_wait(r, high, &cw, count);
2614
2615        if (high == DLM_LOCK_IV)
2616                return;
2617
2618        /*
2619         * If there are locks left on the wait/convert queue then send blocking
2620         * ASTs to granted locks based on the largest requested mode (high)
2621         * found above.
2622         */
2623
2624        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2625                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2626                        if (cw && high == DLM_LOCK_PR &&
2627                            lkb->lkb_grmode == DLM_LOCK_PR)
2628                                queue_bast(r, lkb, DLM_LOCK_CW);
2629                        else
2630                                queue_bast(r, lkb, high);
2631                        lkb->lkb_highbast = high;
2632                }
2633        }
2634}
2635
2636static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2637{
2638        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2639            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2640                if (gr->lkb_highbast < DLM_LOCK_EX)
2641                        return 1;
2642                return 0;
2643        }
2644
2645        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2646                return 1;
2647        return 0;
2648}
2649
2650static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2651                            struct dlm_lkb *lkb)
2652{
2653        struct dlm_lkb *gr;
2654
2655        list_for_each_entry(gr, head, lkb_statequeue) {
2656                /* skip self when sending basts to convertqueue */
2657                if (gr == lkb)
2658                        continue;
2659                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2660                        queue_bast(r, gr, lkb->lkb_rqmode);
2661                        gr->lkb_highbast = lkb->lkb_rqmode;
2662                }
2663        }
2664}
2665
2666static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667{
2668        send_bast_queue(r, &r->res_grantqueue, lkb);
2669}
2670
2671static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2672{
2673        send_bast_queue(r, &r->res_grantqueue, lkb);
2674        send_bast_queue(r, &r->res_convertqueue, lkb);
2675}
2676
2677/* set_master(r, lkb) -- set the master nodeid of a resource
2678
2679   The purpose of this function is to set the nodeid field in the given
2680   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2681   known, it can just be copied to the lkb and the function will return
2682   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2683   before it can be copied to the lkb.
2684
2685   When the rsb nodeid is being looked up remotely, the initial lkb
2686   causing the lookup is kept on the ls_waiters list waiting for the
2687   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2688   on the rsb's res_lookup list until the master is verified.
2689
2690   Return values:
2691   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2692   1: the rsb master is not available and the lkb has been placed on
2693      a wait queue
2694*/
2695
2696static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2697{
2698        int our_nodeid = dlm_our_nodeid();
2699
2700        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2701                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2702                r->res_first_lkid = lkb->lkb_id;
2703                lkb->lkb_nodeid = r->res_nodeid;
2704                return 0;
2705        }
2706
2707        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2708                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2709                return 1;
2710        }
2711
2712        if (r->res_master_nodeid == our_nodeid) {
2713                lkb->lkb_nodeid = 0;
2714                return 0;
2715        }
2716
2717        if (r->res_master_nodeid) {
2718                lkb->lkb_nodeid = r->res_master_nodeid;
2719                return 0;
2720        }
2721
2722        if (dlm_dir_nodeid(r) == our_nodeid) {
2723                /* This is a somewhat unusual case; find_rsb will usually
2724                   have set res_master_nodeid when dir nodeid is local, but
2725                   there are cases where we become the dir node after we've
2726                   past find_rsb and go through _request_lock again.
2727                   confirm_master() or process_lookup_list() needs to be
2728                   called after this. */
2729                log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2730                          lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2731                          r->res_name);
2732                r->res_master_nodeid = our_nodeid;
2733                r->res_nodeid = 0;
2734                lkb->lkb_nodeid = 0;
2735                return 0;
2736        }
2737
2738        wait_pending_remove(r);
2739
2740        r->res_first_lkid = lkb->lkb_id;
2741        send_lookup(r, lkb);
2742        return 1;
2743}
2744
2745static void process_lookup_list(struct dlm_rsb *r)
2746{
2747        struct dlm_lkb *lkb, *safe;
2748
2749        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2750                list_del_init(&lkb->lkb_rsb_lookup);
2751                _request_lock(r, lkb);
2752                schedule();
2753        }
2754}
2755
2756/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2757
2758static void confirm_master(struct dlm_rsb *r, int error)
2759{
2760        struct dlm_lkb *lkb;
2761
2762        if (!r->res_first_lkid)
2763                return;
2764
2765        switch (error) {
2766        case 0:
2767        case -EINPROGRESS:
2768                r->res_first_lkid = 0;
2769                process_lookup_list(r);
2770                break;
2771
2772        case -EAGAIN:
2773        case -EBADR:
2774        case -ENOTBLK:
2775                /* the remote request failed and won't be retried (it was
2776                   a NOQUEUE, or has been canceled/unlocked); make a waiting
2777                   lkb the first_lkid */
2778
2779                r->res_first_lkid = 0;
2780
2781                if (!list_empty(&r->res_lookup)) {
2782                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2783                                         lkb_rsb_lookup);
2784                        list_del_init(&lkb->lkb_rsb_lookup);
2785                        r->res_first_lkid = lkb->lkb_id;
2786                        _request_lock(r, lkb);
2787                }
2788                break;
2789
2790        default:
2791                log_error(r->res_ls, "confirm_master unknown error %d", error);
2792        }
2793}
2794
2795static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2796                         int namelen, unsigned long timeout_cs,
2797                         void (*ast) (void *astparam),
2798                         void *astparam,
2799                         void (*bast) (void *astparam, int mode),
2800                         struct dlm_args *args)
2801{
2802        int rv = -EINVAL;
2803
2804        /* check for invalid arg usage */
2805
2806        if (mode < 0 || mode > DLM_LOCK_EX)
2807                goto out;
2808
2809        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2810                goto out;
2811
2812        if (flags & DLM_LKF_CANCEL)
2813                goto out;
2814
2815        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2816                goto out;
2817
2818        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2819                goto out;
2820
2821        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2822                goto out;
2823
2824        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2825                goto out;
2826
2827        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2828                goto out;
2829
2830        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2831                goto out;
2832
2833        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2834                goto out;
2835
2836        if (!ast || !lksb)
2837                goto out;
2838
2839        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2840                goto out;
2841
2842        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2843                goto out;
2844
2845        /* these args will be copied to the lkb in validate_lock_args,
2846           it cannot be done now because when converting locks, fields in
2847           an active lkb cannot be modified before locking the rsb */
2848
2849        args->flags = flags;
2850        args->astfn = ast;
2851        args->astparam = astparam;
2852        args->bastfn = bast;
2853        args->timeout = timeout_cs;
2854        args->mode = mode;
2855        args->lksb = lksb;
2856        rv = 0;
2857 out:
2858        return rv;
2859}
2860
2861static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2862{
2863        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2864                      DLM_LKF_FORCEUNLOCK))
2865                return -EINVAL;
2866
2867        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2868                return -EINVAL;
2869
2870        args->flags = flags;
2871        args->astparam = astarg;
2872        return 0;
2873}
2874
2875static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2876                              struct dlm_args *args)
2877{
2878        int rv = -EINVAL;
2879
2880        if (args->flags & DLM_LKF_CONVERT) {
2881                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882                        goto out;
2883
2884                if (args->flags & DLM_LKF_QUECVT &&
2885                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886                        goto out;
2887
2888                rv = -EBUSY;
2889                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2890                        goto out;
2891
2892                if (lkb->lkb_wait_type)
2893                        goto out;
2894
2895                if (is_overlap(lkb))
2896                        goto out;
2897        }
2898
2899        lkb->lkb_exflags = args->flags;
2900        lkb->lkb_sbflags = 0;
2901        lkb->lkb_astfn = args->astfn;
2902        lkb->lkb_astparam = args->astparam;
2903        lkb->lkb_bastfn = args->bastfn;
2904        lkb->lkb_rqmode = args->mode;
2905        lkb->lkb_lksb = args->lksb;
2906        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2907        lkb->lkb_ownpid = (int) current->pid;
2908        lkb->lkb_timeout_cs = args->timeout;
2909        rv = 0;
2910 out:
2911        if (rv)
2912                log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2913                          rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2914                          lkb->lkb_status, lkb->lkb_wait_type,
2915                          lkb->lkb_resource->res_name);
2916        return rv;
2917}
2918
2919/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2920   for success */
2921
2922/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2923   because there may be a lookup in progress and it's valid to do
2924   cancel/unlockf on it */
2925
2926static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2927{
2928        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2929        int rv = -EINVAL;
2930
2931        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2932                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2933                dlm_print_lkb(lkb);
2934                goto out;
2935        }
2936
2937        /* an lkb may still exist even though the lock is EOL'ed due to a
2938           cancel, unlock or failed noqueue request; an app can't use these
2939           locks; return same error as if the lkid had not been found at all */
2940
2941        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2942                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943                rv = -ENOENT;
2944                goto out;
2945        }
2946
2947        /* an lkb may be waiting for an rsb lookup to complete where the
2948           lookup was initiated by another lock */
2949
2950        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2951                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2952                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2953                        list_del_init(&lkb->lkb_rsb_lookup);
2954                        queue_cast(lkb->lkb_resource, lkb,
2955                                   args->flags & DLM_LKF_CANCEL ?
2956                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2957                        unhold_lkb(lkb); /* undoes create_lkb() */
2958                }
2959                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2960                rv = -EBUSY;
2961                goto out;
2962        }
2963
2964        /* cancel not allowed with another cancel/unlock in progress */
2965
2966        if (args->flags & DLM_LKF_CANCEL) {
2967                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2968                        goto out;
2969
2970                if (is_overlap(lkb))
2971                        goto out;
2972
2973                /* don't let scand try to do a cancel */
2974                del_timeout(lkb);
2975
2976                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2977                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2978                        rv = -EBUSY;
2979                        goto out;
2980                }
2981
2982                /* there's nothing to cancel */
2983                if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2984                    !lkb->lkb_wait_type) {
2985                        rv = -EBUSY;
2986                        goto out;
2987                }
2988
2989                switch (lkb->lkb_wait_type) {
2990                case DLM_MSG_LOOKUP:
2991                case DLM_MSG_REQUEST:
2992                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2993                        rv = -EBUSY;
2994                        goto out;
2995                case DLM_MSG_UNLOCK:
2996                case DLM_MSG_CANCEL:
2997                        goto out;
2998                }
2999                /* add_to_waiters() will set OVERLAP_CANCEL */
3000                goto out_ok;
3001        }
3002
3003        /* do we need to allow a force-unlock if there's a normal unlock
3004           already in progress?  in what conditions could the normal unlock
3005           fail such that we'd want to send a force-unlock to be sure? */
3006
3007        if (args->flags & DLM_LKF_FORCEUNLOCK) {
3008                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3009                        goto out;
3010
3011                if (is_overlap_unlock(lkb))
3012                        goto out;
3013
3014                /* don't let scand try to do a cancel */
3015                del_timeout(lkb);
3016
3017                if (lkb->lkb_flags & DLM_IFL_RESEND) {
3018                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3019                        rv = -EBUSY;
3020                        goto out;
3021                }
3022
3023                switch (lkb->lkb_wait_type) {
3024                case DLM_MSG_LOOKUP:
3025                case DLM_MSG_REQUEST:
3026                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3027                        rv = -EBUSY;
3028                        goto out;
3029                case DLM_MSG_UNLOCK:
3030                        goto out;
3031                }
3032                /* add_to_waiters() will set OVERLAP_UNLOCK */
3033                goto out_ok;
3034        }
3035
3036        /* normal unlock not allowed if there's any op in progress */
3037        rv = -EBUSY;
3038        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3039                goto out;
3040
3041 out_ok:
3042        /* an overlapping op shouldn't blow away exflags from other op */
3043        lkb->lkb_exflags |= args->flags;
3044        lkb->lkb_sbflags = 0;
3045        lkb->lkb_astparam = args->astparam;
3046        rv = 0;
3047 out:
3048        if (rv)
3049                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3050                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3051                          args->flags, lkb->lkb_wait_type,
3052                          lkb->lkb_resource->res_name);
3053        return rv;
3054}
3055
3056/*
3057 * Four stage 4 varieties:
3058 * do_request(), do_convert(), do_unlock(), do_cancel()
3059 * These are called on the master node for the given lock and
3060 * from the central locking logic.
3061 */
3062
3063static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064{
3065        int error = 0;
3066
3067        if (can_be_granted(r, lkb, 1, 0, NULL)) {
3068                grant_lock(r, lkb);
3069                queue_cast(r, lkb, 0);
3070                goto out;
3071        }
3072
3073        if (can_be_queued(lkb)) {
3074                error = -EINPROGRESS;
3075                add_lkb(r, lkb, DLM_LKSTS_WAITING);
3076                add_timeout(lkb);
3077                goto out;
3078        }
3079
3080        error = -EAGAIN;
3081        queue_cast(r, lkb, -EAGAIN);
3082 out:
3083        return error;
3084}
3085
3086static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087                               int error)
3088{
3089        switch (error) {
3090        case -EAGAIN:
3091                if (force_blocking_asts(lkb))
3092                        send_blocking_asts_all(r, lkb);
3093                break;
3094        case -EINPROGRESS:
3095                send_blocking_asts(r, lkb);
3096                break;
3097        }
3098}
3099
3100static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3101{
3102        int error = 0;
3103        int deadlk = 0;
3104
3105        /* changing an existing lock may allow others to be granted */
3106
3107        if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3108                grant_lock(r, lkb);
3109                queue_cast(r, lkb, 0);
3110                goto out;
3111        }
3112
3113        /* can_be_granted() detected that this lock would block in a conversion
3114           deadlock, so we leave it on the granted queue and return EDEADLK in
3115           the ast for the convert. */
3116
3117        if (deadlk) {
3118                /* it's left on the granted queue */
3119                revert_lock(r, lkb);
3120                queue_cast(r, lkb, -EDEADLK);
3121                error = -EDEADLK;
3122                goto out;
3123        }
3124
3125        /* is_demoted() means the can_be_granted() above set the grmode
3126           to NL, and left us on the granted queue.  This auto-demotion
3127           (due to CONVDEADLK) might mean other locks, and/or this lock, are
3128           now grantable.  We have to try to grant other converting locks
3129           before we try again to grant this one. */
3130
3131        if (is_demoted(lkb)) {
3132                grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3133                if (_can_be_granted(r, lkb, 1, 0)) {
3134                        grant_lock(r, lkb);
3135                        queue_cast(r, lkb, 0);
3136                        goto out;
3137                }
3138                /* else fall through and move to convert queue */
3139        }
3140
3141        if (can_be_queued(lkb)) {
3142                error = -EINPROGRESS;
3143                del_lkb(r, lkb);
3144                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3145                add_timeout(lkb);
3146                goto out;
3147        }
3148
3149        error = -EAGAIN;
3150        queue_cast(r, lkb, -EAGAIN);
3151 out:
3152        return error;
3153}
3154
3155static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3156                               int error)
3157{
3158        switch (error) {
3159        case 0:
3160                grant_pending_locks(r, NULL);
3161                /* grant_pending_locks also sends basts */
3162                break;
3163        case -EAGAIN:
3164                if (force_blocking_asts(lkb))
3165                        send_blocking_asts_all(r, lkb);
3166                break;
3167        case -EINPROGRESS:
3168                send_blocking_asts(r, lkb);
3169                break;
3170        }
3171}
3172
3173static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3174{
3175        remove_lock(r, lkb);
3176        queue_cast(r, lkb, -DLM_EUNLOCK);
3177        return -DLM_EUNLOCK;
3178}
3179
3180static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3181                              int error)
3182{
3183        grant_pending_locks(r, NULL);
3184}
3185
3186/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3187
3188static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189{
3190        int error;
3191
3192        error = revert_lock(r, lkb);
3193        if (error) {
3194                queue_cast(r, lkb, -DLM_ECANCEL);
3195                return -DLM_ECANCEL;
3196        }
3197        return 0;
3198}
3199
3200static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201                              int error)
3202{
3203        if (error)
3204                grant_pending_locks(r, NULL);
3205}
3206
3207/*
3208 * Four stage 3 varieties:
3209 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3210 */
3211
3212/* add a new lkb to a possibly new rsb, called by requesting process */
3213
3214static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3215{
3216        int error;
3217
3218        /* set_master: sets lkb nodeid from r */
3219
3220        error = set_master(r, lkb);
3221        if (error < 0)
3222                goto out;
3223        if (error) {
3224                error = 0;
3225                goto out;
3226        }
3227
3228        if (is_remote(r)) {
3229                /* receive_request() calls do_request() on remote node */
3230                error = send_request(r, lkb);
3231        } else {
3232                error = do_request(r, lkb);
3233                /* for remote locks the request_reply is sent
3234                   between do_request and do_request_effects */
3235                do_request_effects(r, lkb, error);
3236        }
3237 out:
3238        return error;
3239}
3240
3241/* change some property of an existing lkb, e.g. mode */
3242
3243static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244{
3245        int error;
3246
3247        if (is_remote(r)) {
3248                /* receive_convert() calls do_convert() on remote node */
3249                error = send_convert(r, lkb);
3250        } else {
3251                error = do_convert(r, lkb);
3252                /* for remote locks the convert_reply is sent
3253                   between do_convert and do_convert_effects */
3254                do_convert_effects(r, lkb, error);
3255        }
3256
3257        return error;
3258}
3259
3260/* remove an existing lkb from the granted queue */
3261
3262static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263{
3264        int error;
3265
3266        if (is_remote(r)) {
3267                /* receive_unlock() calls do_unlock() on remote node */
3268                error = send_unlock(r, lkb);
3269        } else {
3270                error = do_unlock(r, lkb);
3271                /* for remote locks the unlock_reply is sent
3272                   between do_unlock and do_unlock_effects */
3273                do_unlock_effects(r, lkb, error);
3274        }
3275
3276        return error;
3277}
3278
3279/* remove an existing lkb from the convert or wait queue */
3280
3281static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282{
3283        int error;
3284
3285        if (is_remote(r)) {
3286                /* receive_cancel() calls do_cancel() on remote node */
3287                error = send_cancel(r, lkb);
3288        } else {
3289                error = do_cancel(r, lkb);
3290                /* for remote locks the cancel_reply is sent
3291                   between do_cancel and do_cancel_effects */
3292                do_cancel_effects(r, lkb, error);
3293        }
3294
3295        return error;
3296}
3297
3298/*
3299 * Four stage 2 varieties:
3300 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3301 */
3302
3303static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3304                        int len, struct dlm_args *args)
3305{
3306        struct dlm_rsb *r;
3307        int error;
3308
3309        error = validate_lock_args(ls, lkb, args);
3310        if (error)
3311                return error;
3312
3313        error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3314        if (error)
3315                return error;
3316
3317        lock_rsb(r);
3318
3319        attach_lkb(r, lkb);
3320        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3321
3322        error = _request_lock(r, lkb);
3323
3324        unlock_rsb(r);
3325        put_rsb(r);
3326        return error;
3327}
3328
3329static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330                        struct dlm_args *args)
3331{
3332        struct dlm_rsb *r;
3333        int error;
3334
3335        r = lkb->lkb_resource;
3336
3337        hold_rsb(r);
3338        lock_rsb(r);
3339
3340        error = validate_lock_args(ls, lkb, args);
3341        if (error)
3342                goto out;
3343
3344        error = _convert_lock(r, lkb);
3345 out:
3346        unlock_rsb(r);
3347        put_rsb(r);
3348        return error;
3349}
3350
3351static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352                       struct dlm_args *args)
3353{
3354        struct dlm_rsb *r;
3355        int error;
3356
3357        r = lkb->lkb_resource;
3358
3359        hold_rsb(r);
3360        lock_rsb(r);
3361
3362        error = validate_unlock_args(lkb, args);
3363        if (error)
3364                goto out;
3365
3366        error = _unlock_lock(r, lkb);
3367 out:
3368        unlock_rsb(r);
3369        put_rsb(r);
3370        return error;
3371}
3372
3373static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3374                       struct dlm_args *args)
3375{
3376        struct dlm_rsb *r;
3377        int error;
3378
3379        r = lkb->lkb_resource;
3380
3381        hold_rsb(r);
3382        lock_rsb(r);
3383
3384        error = validate_unlock_args(lkb, args);
3385        if (error)
3386                goto out;
3387
3388        error = _cancel_lock(r, lkb);
3389 out:
3390        unlock_rsb(r);
3391        put_rsb(r);
3392        return error;
3393}
3394
3395/*
3396 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3397 */
3398
3399int dlm_lock(dlm_lockspace_t *lockspace,
3400             int mode,
3401             struct dlm_lksb *lksb,
3402             uint32_t flags,
3403             void *name,
3404             unsigned int namelen,
3405             uint32_t parent_lkid,
3406             void (*ast) (void *astarg),
3407             void *astarg,
3408             void (*bast) (void *astarg, int mode))
3409{
3410        struct dlm_ls *ls;
3411        struct dlm_lkb *lkb;
3412        struct dlm_args args;
3413        int error, convert = flags & DLM_LKF_CONVERT;
3414
3415        ls = dlm_find_lockspace_local(lockspace);
3416        if (!ls)
3417                return -EINVAL;
3418
3419        dlm_lock_recovery(ls);
3420
3421        if (convert)
3422                error = find_lkb(ls, lksb->sb_lkid, &lkb);
3423        else
3424                error = create_lkb(ls, &lkb);
3425
3426        if (error)
3427                goto out;
3428
3429        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430                              astarg, bast, &args);
3431        if (error)
3432                goto out_put;
3433
3434        if (convert)
3435                error = convert_lock(ls, lkb, &args);
3436        else
3437                error = request_lock(ls, lkb, name, namelen, &args);
3438
3439        if (error == -EINPROGRESS)
3440                error = 0;
3441 out_put:
3442        if (convert || error)
3443                __put_lkb(ls, lkb);
3444        if (error == -EAGAIN || error == -EDEADLK)
3445                error = 0;
3446 out:
3447        dlm_unlock_recovery(ls);
3448        dlm_put_lockspace(ls);
3449        return error;
3450}
3451
3452int dlm_unlock(dlm_lockspace_t *lockspace,
3453               uint32_t lkid,
3454               uint32_t flags,
3455               struct dlm_lksb *lksb,
3456               void *astarg)
3457{
3458        struct dlm_ls *ls;
3459        struct dlm_lkb *lkb;
3460        struct dlm_args args;
3461        int error;
3462
3463        ls = dlm_find_lockspace_local(lockspace);
3464        if (!ls)
3465                return -EINVAL;
3466
3467        dlm_lock_recovery(ls);
3468
3469        error = find_lkb(ls, lkid, &lkb);
3470        if (error)
3471                goto out;
3472
3473        error = set_unlock_args(flags, astarg, &args);
3474        if (error)
3475                goto out_put;
3476
3477        if (flags & DLM_LKF_CANCEL)
3478                error = cancel_lock(ls, lkb, &args);
3479        else
3480                error = unlock_lock(ls, lkb, &args);
3481
3482        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3483                error = 0;
3484        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3485                error = 0;
3486 out_put:
3487        dlm_put_lkb(lkb);
3488 out:
3489        dlm_unlock_recovery(ls);
3490        dlm_put_lockspace(ls);
3491        return error;
3492}
3493
3494/*
3495 * send/receive routines for remote operations and replies
3496 *
3497 * send_args
3498 * send_common
3499 * send_request                 receive_request
3500 * send_convert                 receive_convert
3501 * send_unlock                  receive_unlock
3502 * send_cancel                  receive_cancel
3503 * send_grant                   receive_grant
3504 * send_bast                    receive_bast
3505 * send_lookup                  receive_lookup
3506 * send_remove                  receive_remove
3507 *
3508 *                              send_common_reply
3509 * receive_request_reply        send_request_reply
3510 * receive_convert_reply        send_convert_reply
3511 * receive_unlock_reply         send_unlock_reply
3512 * receive_cancel_reply         send_cancel_reply
3513 * receive_lookup_reply         send_lookup_reply
3514 */
3515
3516static int _create_message(struct dlm_ls *ls, int mb_len,
3517                           int to_nodeid, int mstype,
3518                           struct dlm_message **ms_ret,
3519                           struct dlm_mhandle **mh_ret)
3520{
3521        struct dlm_message *ms;
3522        struct dlm_mhandle *mh;
3523        char *mb;
3524
3525        /* get_buffer gives us a message handle (mh) that we need to
3526           pass into lowcomms_commit and a message buffer (mb) that we
3527           write our data into */
3528
3529        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3530        if (!mh)
3531                return -ENOBUFS;
3532
3533        memset(mb, 0, mb_len);
3534
3535        ms = (struct dlm_message *) mb;
3536
3537        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3538        ms->m_header.h_lockspace = ls->ls_global_id;
3539        ms->m_header.h_nodeid = dlm_our_nodeid();
3540        ms->m_header.h_length = mb_len;
3541        ms->m_header.h_cmd = DLM_MSG;
3542
3543        ms->m_type = mstype;
3544
3545        *mh_ret = mh;
3546        *ms_ret = ms;
3547        return 0;
3548}
3549
3550static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551                          int to_nodeid, int mstype,
3552                          struct dlm_message **ms_ret,
3553                          struct dlm_mhandle **mh_ret)
3554{
3555        int mb_len = sizeof(struct dlm_message);
3556
3557        switch (mstype) {
3558        case DLM_MSG_REQUEST:
3559        case DLM_MSG_LOOKUP:
3560        case DLM_MSG_REMOVE:
3561                mb_len += r->res_length;
3562                break;
3563        case DLM_MSG_CONVERT:
3564        case DLM_MSG_UNLOCK:
3565        case DLM_MSG_REQUEST_REPLY:
3566        case DLM_MSG_CONVERT_REPLY:
3567        case DLM_MSG_GRANT:
3568                if (lkb && lkb->lkb_lvbptr)
3569                        mb_len += r->res_ls->ls_lvblen;
3570                break;
3571        }
3572
3573        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3574                               ms_ret, mh_ret);
3575}
3576
3577/* further lowcomms enhancements or alternate implementations may make
3578   the return value from this function useful at some point */
3579
3580static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3581{
3582        dlm_message_out(ms);
3583        dlm_lowcomms_commit_buffer(mh);
3584        return 0;
3585}
3586
3587static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3588                      struct dlm_message *ms)
3589{
3590        ms->m_nodeid   = lkb->lkb_nodeid;
3591        ms->m_pid      = lkb->lkb_ownpid;
3592        ms->m_lkid     = lkb->lkb_id;
3593        ms->m_remid    = lkb->lkb_remid;
3594        ms->m_exflags  = lkb->lkb_exflags;
3595        ms->m_sbflags  = lkb->lkb_sbflags;
3596        ms->m_flags    = lkb->lkb_flags;
3597        ms->m_lvbseq   = lkb->lkb_lvbseq;
3598        ms->m_status   = lkb->lkb_status;
3599        ms->m_grmode   = lkb->lkb_grmode;
3600        ms->m_rqmode   = lkb->lkb_rqmode;
3601        ms->m_hash     = r->res_hash;
3602
3603        /* m_result and m_bastmode are set from function args,
3604           not from lkb fields */
3605
3606        if (lkb->lkb_bastfn)
3607                ms->m_asts |= DLM_CB_BAST;
3608        if (lkb->lkb_astfn)
3609                ms->m_asts |= DLM_CB_CAST;
3610
3611        /* compare with switch in create_message; send_remove() doesn't
3612           use send_args() */
3613
3614        switch (ms->m_type) {
3615        case DLM_MSG_REQUEST:
3616        case DLM_MSG_LOOKUP:
3617                memcpy(ms->m_extra, r->res_name, r->res_length);
3618                break;
3619        case DLM_MSG_CONVERT:
3620        case DLM_MSG_UNLOCK:
3621        case DLM_MSG_REQUEST_REPLY:
3622        case DLM_MSG_CONVERT_REPLY:
3623        case DLM_MSG_GRANT:
3624                if (!lkb->lkb_lvbptr)
3625                        break;
3626                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3627                break;
3628        }
3629}
3630
3631static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3632{
3633        struct dlm_message *ms;
3634        struct dlm_mhandle *mh;
3635        int to_nodeid, error;
3636
3637        to_nodeid = r->res_nodeid;
3638
3639        error = add_to_waiters(lkb, mstype, to_nodeid);
3640        if (error)
3641                return error;
3642
3643        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3644        if (error)
3645                goto fail;
3646
3647        send_args(r, lkb, ms);
3648
3649        error = send_message(mh, ms);
3650        if (error)
3651                goto fail;
3652        return 0;
3653
3654 fail:
3655        remove_from_waiters(lkb, msg_reply_type(mstype));
3656        return error;
3657}
3658
3659static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660{
3661        return send_common(r, lkb, DLM_MSG_REQUEST);
3662}
3663
3664static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665{
3666        int error;
3667
3668        error = send_common(r, lkb, DLM_MSG_CONVERT);
3669
3670        /* down conversions go without a reply from the master */
3671        if (!error && down_conversion(lkb)) {
3672                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3673                r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3674                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3675                r->res_ls->ls_stub_ms.m_result = 0;
3676                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3677        }
3678
3679        return error;
3680}
3681
3682/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3683   MASTER_UNCERTAIN to force the next request on the rsb to confirm
3684   that the master is still correct. */
3685
3686static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3687{
3688        return send_common(r, lkb, DLM_MSG_UNLOCK);
3689}
3690
3691static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3692{
3693        return send_common(r, lkb, DLM_MSG_CANCEL);
3694}
3695
3696static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697{
3698        struct dlm_message *ms;
3699        struct dlm_mhandle *mh;
3700        int to_nodeid, error;
3701
3702        to_nodeid = lkb->lkb_nodeid;
3703
3704        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3705        if (error)
3706                goto out;
3707
3708        send_args(r, lkb, ms);
3709
3710        ms->m_result = 0;
3711
3712        error = send_message(mh, ms);
3713 out:
3714        return error;
3715}
3716
3717static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3718{
3719        struct dlm_message *ms;
3720        struct dlm_mhandle *mh;
3721        int to_nodeid, error;
3722
3723        to_nodeid = lkb->lkb_nodeid;
3724
3725        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3726        if (error)
3727                goto out;
3728
3729        send_args(r, lkb, ms);
3730
3731        ms->m_bastmode = mode;
3732
3733        error = send_message(mh, ms);
3734 out:
3735        return error;
3736}
3737
3738static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3739{
3740        struct dlm_message *ms;
3741        struct dlm_mhandle *mh;
3742        int to_nodeid, error;
3743
3744        to_nodeid = dlm_dir_nodeid(r);
3745
3746        error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3747        if (error)
3748                return error;
3749
3750        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3751        if (error)
3752                goto fail;
3753
3754        send_args(r, lkb, ms);
3755
3756        error = send_message(mh, ms);
3757        if (error)
3758                goto fail;
3759        return 0;
3760
3761 fail:
3762        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3763        return error;
3764}
3765
3766static int send_remove(struct dlm_rsb *r)
3767{
3768        struct dlm_message *ms;
3769        struct dlm_mhandle *mh;
3770        int to_nodeid, error;
3771
3772        to_nodeid = dlm_dir_nodeid(r);
3773
3774        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3775        if (error)
3776                goto out;
3777
3778        memcpy(ms->m_extra, r->res_name, r->res_length);
3779        ms->m_hash = r->res_hash;
3780
3781        error = send_message(mh, ms);
3782 out:
3783        return error;
3784}
3785
3786static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3787                             int mstype, int rv)
3788{
3789        struct dlm_message *ms;
3790        struct dlm_mhandle *mh;
3791        int to_nodeid, error;
3792
3793        to_nodeid = lkb->lkb_nodeid;
3794
3795        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3796        if (error)
3797                goto out;
3798
3799        send_args(r, lkb, ms);
3800
3801        ms->m_result = rv;
3802
3803        error = send_message(mh, ms);
3804 out:
3805        return error;
3806}
3807
3808static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809{
3810        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3811}
3812
3813static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3814{
3815        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3816}
3817
3818static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819{
3820        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3821}
3822
3823static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824{
3825        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3826}
3827
3828static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3829                             int ret_nodeid, int rv)
3830{
3831        struct dlm_rsb *r = &ls->ls_stub_rsb;
3832        struct dlm_message *ms;
3833        struct dlm_mhandle *mh;
3834        int error, nodeid = ms_in->m_header.h_nodeid;
3835
3836        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3837        if (error)
3838                goto out;
3839
3840        ms->m_lkid = ms_in->m_lkid;
3841        ms->m_result = rv;
3842        ms->m_nodeid = ret_nodeid;
3843
3844        error = send_message(mh, ms);
3845 out:
3846        return error;
3847}
3848
3849/* which args we save from a received message depends heavily on the type
3850   of message, unlike the send side where we can safely send everything about
3851   the lkb for any type of message */
3852
3853static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3854{
3855        lkb->lkb_exflags = ms->m_exflags;
3856        lkb->lkb_sbflags = ms->m_sbflags;
3857        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3858                         (ms->m_flags & 0x0000FFFF);
3859}
3860
3861static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3862{
3863        if (ms->m_flags == DLM_IFL_STUB_MS)
3864                return;
3865
3866        lkb->lkb_sbflags = ms->m_sbflags;
3867        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                         (ms->m_flags & 0x0000FFFF);
3869}
3870
3871static int receive_extralen(struct dlm_message *ms)
3872{
3873        return (ms->m_header.h_length - sizeof(struct dlm_message));
3874}
3875
3876static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877                       struct dlm_message *ms)
3878{
3879        int len;
3880
3881        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3882                if (!lkb->lkb_lvbptr)
3883                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3884                if (!lkb->lkb_lvbptr)
3885                        return -ENOMEM;
3886                len = receive_extralen(ms);
3887                if (len > DLM_RESNAME_MAXLEN)
3888                        len = DLM_RESNAME_MAXLEN;
3889                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3890        }
3891        return 0;
3892}
3893
3894static void fake_bastfn(void *astparam, int mode)
3895{
3896        log_print("fake_bastfn should not be called");
3897}
3898
3899static void fake_astfn(void *astparam)
3900{
3901        log_print("fake_astfn should not be called");
3902}
3903
3904static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905                                struct dlm_message *ms)
3906{
3907        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3908        lkb->lkb_ownpid = ms->m_pid;
3909        lkb->lkb_remid = ms->m_lkid;
3910        lkb->lkb_grmode = DLM_LOCK_IV;
3911        lkb->lkb_rqmode = ms->m_rqmode;
3912
3913        lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3914        lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3915
3916        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3917                /* lkb was just created so there won't be an lvb yet */
3918                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3919                if (!lkb->lkb_lvbptr)
3920                        return -ENOMEM;
3921        }
3922
3923        return 0;
3924}
3925
3926static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3927                                struct dlm_message *ms)
3928{
3929        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3930                return -EBUSY;
3931
3932        if (receive_lvb(ls, lkb, ms))
3933                return -ENOMEM;
3934
3935        lkb->lkb_rqmode = ms->m_rqmode;
3936        lkb->lkb_lvbseq = ms->m_lvbseq;
3937
3938        return 0;
3939}
3940
3941static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3942                               struct dlm_message *ms)
3943{
3944        if (receive_lvb(ls, lkb, ms))
3945                return -ENOMEM;
3946        return 0;
3947}
3948
3949/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3950   uses to send a reply and that the remote end uses to process the reply. */
3951
3952static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3953{
3954        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3955        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3956        lkb->lkb_remid = ms->m_lkid;
3957}
3958
3959/* This is called after the rsb is locked so that we can safely inspect
3960   fields in the lkb. */
3961
3962static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3963{
3964        int from = ms->m_header.h_nodeid;
3965        int error = 0;
3966
3967        switch (ms->m_type) {
3968        case DLM_MSG_CONVERT:
3969        case DLM_MSG_UNLOCK:
3970        case DLM_MSG_CANCEL:
3971                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3972                        error = -EINVAL;
3973                break;
3974
3975        case DLM_MSG_CONVERT_REPLY:
3976        case DLM_MSG_UNLOCK_REPLY:
3977        case DLM_MSG_CANCEL_REPLY:
3978        case DLM_MSG_GRANT:
3979        case DLM_MSG_BAST:
3980                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3981                        error = -EINVAL;
3982                break;
3983
3984        case DLM_MSG_REQUEST_REPLY:
3985                if (!is_process_copy(lkb))
3986                        error = -EINVAL;
3987                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3988                        error = -EINVAL;
3989                break;
3990
3991        default:
3992                error = -EINVAL;
3993        }
3994
3995        if (error)
3996                log_error(lkb->lkb_resource->res_ls,
3997                          "ignore invalid message %d from %d %x %x %x %d",
3998                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3999                          lkb->lkb_flags, lkb->lkb_nodeid);
4000        return error;
4001}
4002
4003static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4004{
4005        char name[DLM_RESNAME_MAXLEN + 1];
4006        struct dlm_message *ms;
4007        struct dlm_mhandle *mh;
4008        struct dlm_rsb *r;
4009        uint32_t hash, b;
4010        int rv, dir_nodeid;
4011
4012        memset(name, 0, sizeof(name));
4013        memcpy(name, ms_name, len);
4014
4015        hash = jhash(name, len, 0);
4016        b = hash & (ls->ls_rsbtbl_size - 1);
4017
4018        dir_nodeid = dlm_hash2nodeid(ls, hash);
4019
4020        log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4021
4022        spin_lock(&ls->ls_rsbtbl[b].lock);
4023        rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4024        if (!rv) {
4025                spin_unlock(&ls->ls_rsbtbl[b].lock);
4026                log_error(ls, "repeat_remove on keep %s", name);
4027                return;
4028        }
4029
4030        rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4031        if (!rv) {
4032                spin_unlock(&ls->ls_rsbtbl[b].lock);
4033                log_error(ls, "repeat_remove on toss %s", name);
4034                return;
4035        }
4036
4037        /* use ls->remove_name2 to avoid conflict with shrink? */
4038
4039        spin_lock(&ls->ls_remove_spin);
4040        ls->ls_remove_len = len;
4041        memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4042        spin_unlock(&ls->ls_remove_spin);
4043        spin_unlock(&ls->ls_rsbtbl[b].lock);
4044
4045        rv = _create_message(ls, sizeof(struct dlm_message) + len,
4046                             dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4047        if (rv)
4048                return;
4049
4050        memcpy(ms->m_extra, name, len);
4051        ms->m_hash = hash;
4052
4053        send_message(mh, ms);
4054
4055        spin_lock(&ls->ls_remove_spin);
4056        ls->ls_remove_len = 0;
4057        memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4058        spin_unlock(&ls->ls_remove_spin);
4059}
4060
4061static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4062{
4063        struct dlm_lkb *lkb;
4064        struct dlm_rsb *r;
4065        int from_nodeid;
4066        int error, namelen = 0;
4067
4068        from_nodeid = ms->m_header.h_nodeid;
4069
4070        error = create_lkb(ls, &lkb);
4071        if (error)
4072                goto fail;
4073
4074        receive_flags(lkb, ms);
4075        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4076        error = receive_request_args(ls, lkb, ms);
4077        if (error) {
4078                __put_lkb(ls, lkb);
4079                goto fail;
4080        }
4081
4082        /* The dir node is the authority on whether we are the master
4083           for this rsb or not, so if the master sends us a request, we should
4084           recreate the rsb if we've destroyed it.   This race happens when we
4085           send a remove message to the dir node at the same time that the dir
4086           node sends us a request for the rsb. */
4087
4088        namelen = receive_extralen(ms);
4089
4090        error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4091                         R_RECEIVE_REQUEST, &r);
4092        if (error) {
4093                __put_lkb(ls, lkb);
4094                goto fail;
4095        }
4096
4097        lock_rsb(r);
4098
4099        if (r->res_master_nodeid != dlm_our_nodeid()) {
4100                error = validate_master_nodeid(ls, r, from_nodeid);
4101                if (error) {
4102                        unlock_rsb(r);
4103                        put_rsb(r);
4104                        __put_lkb(ls, lkb);
4105                        goto fail;
4106                }
4107        }
4108
4109        attach_lkb(r, lkb);
4110        error = do_request(r, lkb);
4111        send_request_reply(r, lkb, error);
4112        do_request_effects(r, lkb, error);
4113
4114        unlock_rsb(r);
4115        put_rsb(r);
4116
4117        if (error == -EINPROGRESS)
4118                error = 0;
4119        if (error)
4120                dlm_put_lkb(lkb);
4121        return 0;
4122
4123 fail:
4124        /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4125           and do this receive_request again from process_lookup_list once
4126           we get the lookup reply.  This would avoid a many repeated
4127           ENOTBLK request failures when the lookup reply designating us
4128           as master is delayed. */
4129
4130        /* We could repeatedly return -EBADR here if our send_remove() is
4131           delayed in being sent/arriving/being processed on the dir node.
4132           Another node would repeatedly lookup up the master, and the dir
4133           node would continue returning our nodeid until our send_remove
4134           took effect.
4135
4136           We send another remove message in case our previous send_remove
4137           was lost/ignored/missed somehow. */
4138
4139        if (error != -ENOTBLK) {
4140                log_limit(ls, "receive_request %x from %d %d",
4141                          ms->m_lkid, from_nodeid, error);
4142        }
4143
4144        if (namelen && error == -EBADR) {
4145                send_repeat_remove(ls, ms->m_extra, namelen);
4146                msleep(1000);
4147        }
4148
4149        setup_stub_lkb(ls, ms);
4150        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4151        return error;
4152}
4153
4154static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4155{
4156        struct dlm_lkb *lkb;
4157        struct dlm_rsb *r;
4158        int error, reply = 1;
4159
4160        error = find_lkb(ls, ms->m_remid, &lkb);
4161        if (error)
4162                goto fail;
4163
4164        if (lkb->lkb_remid != ms->m_lkid) {
4165                log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4166                          "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4167                          (unsigned long long)lkb->lkb_recover_seq,
4168                          ms->m_header.h_nodeid, ms->m_lkid);
4169                error = -ENOENT;
4170                goto fail;
4171        }
4172
4173        r = lkb->lkb_resource;
4174
4175        hold_rsb(r);
4176        lock_rsb(r);
4177
4178        error = validate_message(lkb, ms);
4179        if (error)
4180                goto out;
4181
4182        receive_flags(lkb, ms);
4183
4184        error = receive_convert_args(ls, lkb, ms);
4185        if (error) {
4186                send_convert_reply(r, lkb, error);
4187                goto out;
4188        }
4189
4190        reply = !down_conversion(lkb);
4191
4192        error = do_convert(r, lkb);
4193        if (reply)
4194                send_convert_reply(r, lkb, error);
4195        do_convert_effects(r, lkb, error);
4196 out:
4197        unlock_rsb(r);
4198        put_rsb(r);
4199        dlm_put_lkb(lkb);
4200        return 0;
4201
4202 fail:
4203        setup_stub_lkb(ls, ms);
4204        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4205        return error;
4206}
4207
4208static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4209{
4210        struct dlm_lkb *lkb;
4211        struct dlm_rsb *r;
4212        int error;
4213
4214        error = find_lkb(ls, ms->m_remid, &lkb);
4215        if (error)
4216                goto fail;
4217
4218        if (lkb->lkb_remid != ms->m_lkid) {
4219                log_error(ls, "receive_unlock %x remid %x remote %d %x",
4220                          lkb->lkb_id, lkb->lkb_remid,
4221                          ms->m_header.h_nodeid, ms->m_lkid);
4222                error = -ENOENT;
4223                goto fail;
4224        }
4225
4226        r = lkb->lkb_resource;
4227
4228        hold_rsb(r);
4229        lock_rsb(r);
4230
4231        error = validate_message(lkb, ms);
4232        if (error)
4233                goto out;
4234
4235        receive_flags(lkb, ms);
4236
4237        error = receive_unlock_args(ls, lkb, ms);
4238        if (error) {
4239                send_unlock_reply(r, lkb, error);
4240                goto out;
4241        }
4242
4243        error = do_unlock(r, lkb);
4244        send_unlock_reply(r, lkb, error);
4245        do_unlock_effects(r, lkb, error);
4246 out:
4247        unlock_rsb(r);
4248        put_rsb(r);
4249        dlm_put_lkb(lkb);
4250        return 0;
4251
4252 fail:
4253        setup_stub_lkb(ls, ms);
4254        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4255        return error;
4256}
4257
4258static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4259{
4260        struct dlm_lkb *lkb;
4261        struct dlm_rsb *r;
4262        int error;
4263
4264        error = find_lkb(ls, ms->m_remid, &lkb);
4265        if (error)
4266                goto fail;
4267
4268        receive_flags(lkb, ms);
4269
4270        r = lkb->lkb_resource;
4271
4272        hold_rsb(r);
4273        lock_rsb(r);
4274
4275        error = validate_message(lkb, ms);
4276        if (error)
4277                goto out;
4278
4279        error = do_cancel(r, lkb);
4280        send_cancel_reply(r, lkb, error);
4281        do_cancel_effects(r, lkb, error);
4282 out:
4283        unlock_rsb(r);
4284        put_rsb(r);
4285        dlm_put_lkb(lkb);
4286        return 0;
4287
4288 fail:
4289        setup_stub_lkb(ls, ms);
4290        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4291        return error;
4292}
4293
4294static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4295{
4296        struct dlm_lkb *lkb;
4297        struct dlm_rsb *r;
4298        int error;
4299
4300        error = find_lkb(ls, ms->m_remid, &lkb);
4301        if (error)
4302                return error;
4303
4304        r = lkb->lkb_resource;
4305
4306        hold_rsb(r);
4307        lock_rsb(r);
4308
4309        error = validate_message(lkb, ms);
4310        if (error)
4311                goto out;
4312
4313        receive_flags_reply(lkb, ms);
4314        if (is_altmode(lkb))
4315                munge_altmode(lkb, ms);
4316        grant_lock_pc(r, lkb, ms);
4317        queue_cast(r, lkb, 0);
4318 out:
4319        unlock_rsb(r);
4320        put_rsb(r);
4321        dlm_put_lkb(lkb);
4322        return 0;
4323}
4324
4325static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4326{
4327        struct dlm_lkb *lkb;
4328        struct dlm_rsb *r;
4329        int error;
4330
4331        error = find_lkb(ls, ms->m_remid, &lkb);
4332        if (error)
4333                return error;
4334
4335        r = lkb->lkb_resource;
4336
4337        hold_rsb(r);
4338        lock_rsb(r);
4339
4340        error = validate_message(lkb, ms);
4341        if (error)
4342                goto out;
4343
4344        queue_bast(r, lkb, ms->m_bastmode);
4345        lkb->lkb_highbast = ms->m_bastmode;
4346 out:
4347        unlock_rsb(r);
4348        put_rsb(r);
4349        dlm_put_lkb(lkb);
4350        return 0;
4351}
4352
4353static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4354{
4355        int len, error, ret_nodeid, from_nodeid, our_nodeid;
4356
4357        from_nodeid = ms->m_header.h_nodeid;
4358        our_nodeid = dlm_our_nodeid();
4359
4360        len = receive_extralen(ms);
4361
4362        error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4363                                  &ret_nodeid, NULL);
4364
4365        /* Optimization: we're master so treat lookup as a request */
4366        if (!error && ret_nodeid == our_nodeid) {
4367                receive_request(ls, ms);
4368                return;
4369        }
4370        send_lookup_reply(ls, ms, ret_nodeid, error);
4371}
4372
4373static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4374{
4375        char name[DLM_RESNAME_MAXLEN+1];
4376        struct dlm_rsb *r;
4377        uint32_t hash, b;
4378        int rv, len, dir_nodeid, from_nodeid;
4379
4380        from_nodeid = ms->m_header.h_nodeid;
4381
4382        len = receive_extralen(ms);
4383
4384        if (len > DLM_RESNAME_MAXLEN) {
4385                log_error(ls, "receive_remove from %d bad len %d",
4386                          from_nodeid, len);
4387                return;
4388        }
4389
4390        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4391        if (dir_nodeid != dlm_our_nodeid()) {
4392                log_error(ls, "receive_remove from %d bad nodeid %d",
4393                          from_nodeid, dir_nodeid);
4394                return;
4395        }
4396
4397        /* Look for name on rsbtbl.toss, if it's there, kill it.
4398           If it's on rsbtbl.keep, it's being used, and we should ignore this
4399           message.  This is an expected race between the dir node sending a
4400           request to the master node at the same time as the master node sends
4401           a remove to the dir node.  The resolution to that race is for the
4402           dir node to ignore the remove message, and the master node to
4403           recreate the master rsb when it gets a request from the dir node for
4404           an rsb it doesn't have. */
4405
4406        memset(name, 0, sizeof(name));
4407        memcpy(name, ms->m_extra, len);
4408
4409        hash = jhash(name, len, 0);
4410        b = hash & (ls->ls_rsbtbl_size - 1);
4411
4412        spin_lock(&ls->ls_rsbtbl[b].lock);
4413
4414        rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4415        if (rv) {
4416                /* verify the rsb is on keep list per comment above */
4417                rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4418                if (rv) {
4419                        /* should not happen */
4420                        log_error(ls, "receive_remove from %d not found %s",
4421                                  from_nodeid, name);
4422                        spin_unlock(&ls->ls_rsbtbl[b].lock);
4423                        return;
4424                }
4425                if (r->res_master_nodeid != from_nodeid) {
4426                        /* should not happen */
4427                        log_error(ls, "receive_remove keep from %d master %d",
4428                                  from_nodeid, r->res_master_nodeid);
4429                        dlm_print_rsb(r);
4430                        spin_unlock(&ls->ls_rsbtbl[b].lock);
4431                        return;
4432                }
4433
4434                log_debug(ls, "receive_remove from %d master %d first %x %s",
4435                          from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4436                          name);
4437                spin_unlock(&ls->ls_rsbtbl[b].lock);
4438                return;
4439        }
4440
4441        if (r->res_master_nodeid != from_nodeid) {
4442                log_error(ls, "receive_remove toss from %d master %d",
4443                          from_nodeid, r->res_master_nodeid);
4444                dlm_print_rsb(r);
4445                spin_unlock(&ls->ls_rsbtbl[b].lock);
4446                return;
4447        }
4448
4449        if (kref_put(&r->res_ref, kill_rsb)) {
4450                rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4451                spin_unlock(&ls->ls_rsbtbl[b].lock);
4452                dlm_free_rsb(r);
4453        } else {
4454                log_error(ls, "receive_remove from %d rsb ref error",
4455                          from_nodeid);
4456                dlm_print_rsb(r);
4457                spin_unlock(&ls->ls_rsbtbl[b].lock);
4458        }
4459}
4460
4461static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4462{
4463        do_purge(ls, ms->m_nodeid, ms->m_pid);
4464}
4465
4466static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4467{
4468        struct dlm_lkb *lkb;
4469        struct dlm_rsb *r;
4470        int error, mstype, result;
4471        int from_nodeid = ms->m_header.h_nodeid;
4472
4473        error = find_lkb(ls, ms->m_remid, &lkb);
4474        if (error)
4475                return error;
4476
4477        r = lkb->lkb_resource;
4478        hold_rsb(r);
4479        lock_rsb(r);
4480
4481        error = validate_message(lkb, ms);
4482        if (error)
4483                goto out;
4484
4485        mstype = lkb->lkb_wait_type;
4486        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4487        if (error) {
4488                log_error(ls, "receive_request_reply %x remote %d %x result %d",
4489                          lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4490                dlm_dump_rsb(r);
4491                goto out;
4492        }
4493
4494        /* Optimization: the dir node was also the master, so it took our
4495           lookup as a request and sent request reply instead of lookup reply */
4496        if (mstype == DLM_MSG_LOOKUP) {
4497                r->res_master_nodeid = from_nodeid;
4498                r->res_nodeid = from_nodeid;
4499                lkb->lkb_nodeid = from_nodeid;
4500        }
4501
4502        /* this is the value returned from do_request() on the master */
4503        result = ms->m_result;
4504
4505        switch (result) {
4506        case -EAGAIN:
4507                /* request would block (be queued) on remote master */
4508                queue_cast(r, lkb, -EAGAIN);
4509                confirm_master(r, -EAGAIN);
4510                unhold_lkb(lkb); /* undoes create_lkb() */
4511                break;
4512
4513        case -EINPROGRESS:
4514        case 0:
4515                /* request was queued or granted on remote master */
4516                receive_flags_reply(lkb, ms);
4517                lkb->lkb_remid = ms->m_lkid;
4518                if (is_altmode(lkb))
4519                        munge_altmode(lkb, ms);
4520                if (result) {
4521                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
4522                        add_timeout(lkb);
4523                } else {
4524                        grant_lock_pc(r, lkb, ms);
4525                        queue_cast(r, lkb, 0);
4526                }
4527                confirm_master(r, result);
4528                break;
4529
4530        case -EBADR:
4531        case -ENOTBLK:
4532                /* find_rsb failed to find rsb or rsb wasn't master */
4533                log_limit(ls, "receive_request_reply %x from %d %d "
4534                          "master %d dir %d first %x %s", lkb->lkb_id,
4535                          from_nodeid, result, r->res_master_nodeid,
4536                          r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4537
4538                if (r->res_dir_nodeid != dlm_our_nodeid() &&
4539                    r->res_master_nodeid != dlm_our_nodeid()) {
4540                        /* cause _request_lock->set_master->send_lookup */
4541                        r->res_master_nodeid = 0;
4542                        r->res_nodeid = -1;
4543                        lkb->lkb_nodeid = -1;
4544                }
4545
4546                if (is_overlap(lkb)) {
4547                        /* we'll ignore error in cancel/unlock reply */
4548                        queue_cast_overlap(r, lkb);
4549                        confirm_master(r, result);
4550                        unhold_lkb(lkb); /* undoes create_lkb() */
4551                } else {
4552                        _request_lock(r, lkb);
4553
4554                        if (r->res_master_nodeid == dlm_our_nodeid())
4555                                confirm_master(r, 0);
4556                }
4557                break;
4558
4559        default:
4560                log_error(ls, "receive_request_reply %x error %d",
4561                          lkb->lkb_id, result);
4562        }
4563
4564        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4565                log_debug(ls, "receive_request_reply %x result %d unlock",
4566                          lkb->lkb_id, result);
4567                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4568                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4569                send_unlock(r, lkb);
4570        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4571                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4572                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4573                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4574                send_cancel(r, lkb);
4575        } else {
4576                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4577                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4578        }
4579 out:
4580        unlock_rsb(r);
4581        put_rsb(r);
4582        dlm_put_lkb(lkb);
4583        return 0;
4584}
4585
4586static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4587                                    struct dlm_message *ms)
4588{
4589        /* this is the value returned from do_convert() on the master */
4590        switch (ms->m_result) {
4591        case -EAGAIN:
4592                /* convert would block (be queued) on remote master */
4593                queue_cast(r, lkb, -EAGAIN);
4594                break;
4595
4596        case -EDEADLK:
4597                receive_flags_reply(lkb, ms);
4598                revert_lock_pc(r, lkb);
4599                queue_cast(r, lkb, -EDEADLK);
4600                break;
4601
4602        case -EINPROGRESS:
4603                /* convert was queued on remote master */
4604                receive_flags_reply(lkb, ms);
4605                if (is_demoted(lkb))
4606                        munge_demoted(lkb);
4607                del_lkb(r, lkb);
4608                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4609                add_timeout(lkb);
4610                break;
4611
4612        case 0:
4613                /* convert was granted on remote master */
4614                receive_flags_reply(lkb, ms);
4615                if (is_demoted(lkb))
4616                        munge_demoted(lkb);
4617                grant_lock_pc(r, lkb, ms);
4618                queue_cast(r, lkb, 0);
4619                break;
4620
4621        default:
4622                log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4623                          lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4624                          ms->m_result);
4625                dlm_print_rsb(r);
4626                dlm_print_lkb(lkb);
4627        }
4628}
4629
4630static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4631{
4632        struct dlm_rsb *r = lkb->lkb_resource;
4633        int error;
4634
4635        hold_rsb(r);
4636        lock_rsb(r);
4637
4638        error = validate_message(lkb, ms);
4639        if (error)
4640                goto out;
4641
4642        /* stub reply can happen with waiters_mutex held */
4643        error = remove_from_waiters_ms(lkb, ms);
4644        if (error)
4645                goto out;
4646
4647        __receive_convert_reply(r, lkb, ms);
4648 out:
4649        unlock_rsb(r);
4650        put_rsb(r);
4651}
4652
4653static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4654{
4655        struct dlm_lkb *lkb;
4656        int error;
4657
4658        error = find_lkb(ls, ms->m_remid, &lkb);
4659        if (error)
4660                return error;
4661
4662        _receive_convert_reply(lkb, ms);
4663        dlm_put_lkb(lkb);
4664        return 0;
4665}
4666
4667static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4668{
4669        struct dlm_rsb *r = lkb->lkb_resource;
4670        int error;
4671
4672        hold_rsb(r);
4673        lock_rsb(r);
4674
4675        error = validate_message(lkb, ms);
4676        if (error)
4677                goto out;
4678
4679        /* stub reply can happen with waiters_mutex held */
4680        error = remove_from_waiters_ms(lkb, ms);
4681        if (error)
4682                goto out;
4683
4684        /* this is the value returned from do_unlock() on the master */
4685
4686        switch (ms->m_result) {
4687        case -DLM_EUNLOCK:
4688                receive_flags_reply(lkb, ms);
4689                remove_lock_pc(r, lkb);
4690                queue_cast(r, lkb, -DLM_EUNLOCK);
4691                break;
4692        case -ENOENT:
4693                break;
4694        default:
4695                log_error(r->res_ls, "receive_unlock_reply %x error %d",
4696                          lkb->lkb_id, ms->m_result);
4697        }
4698 out:
4699        unlock_rsb(r);
4700        put_rsb(r);
4701}
4702
4703static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4704{
4705        struct dlm_lkb *lkb;
4706        int error;
4707
4708        error = find_lkb(ls, ms->m_remid, &lkb);
4709        if (error)
4710                return error;
4711
4712        _receive_unlock_reply(lkb, ms);
4713        dlm_put_lkb(lkb);
4714        return 0;
4715}
4716
4717static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4718{
4719        struct dlm_rsb *r = lkb->lkb_resource;
4720        int error;
4721
4722        hold_rsb(r);
4723        lock_rsb(r);
4724
4725        error = validate_message(lkb, ms);
4726        if (error)
4727                goto out;
4728
4729        /* stub reply can happen with waiters_mutex held */
4730        error = remove_from_waiters_ms(lkb, ms);
4731        if (error)
4732                goto out;
4733
4734        /* this is the value returned from do_cancel() on the master */
4735
4736        switch (ms->m_result) {
4737        case -DLM_ECANCEL:
4738                receive_flags_reply(lkb, ms);
4739                revert_lock_pc(r, lkb);
4740                queue_cast(r, lkb, -DLM_ECANCEL);
4741                break;
4742        case 0:
4743                break;
4744        default:
4745                log_error(r->res_ls, "receive_cancel_reply %x error %d",
4746                          lkb->lkb_id, ms->m_result);
4747        }
4748 out:
4749        unlock_rsb(r);
4750        put_rsb(r);
4751}
4752
4753static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4754{
4755        struct dlm_lkb *lkb;
4756        int error;
4757
4758        error = find_lkb(ls, ms->m_remid, &lkb);
4759        if (error)
4760                return error;
4761
4762        _receive_cancel_reply(lkb, ms);
4763        dlm_put_lkb(lkb);
4764        return 0;
4765}
4766
4767static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4768{
4769        struct dlm_lkb *lkb;
4770        struct dlm_rsb *r;
4771        int error, ret_nodeid;
4772        int do_lookup_list = 0;
4773
4774        error = find_lkb(ls, ms->m_lkid, &lkb);
4775        if (error) {
4776                log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4777                return;
4778        }
4779
4780        /* ms->m_result is the value returned by dlm_master_lookup on dir node
4781           FIXME: will a non-zero error ever be returned? */
4782
4783        r = lkb->lkb_resource;
4784        hold_rsb(r);
4785        lock_rsb(r);
4786
4787        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4788        if (error)
4789                goto out;
4790
4791        ret_nodeid = ms->m_nodeid;
4792
4793        /* We sometimes receive a request from the dir node for this
4794           rsb before we've received the dir node's loookup_reply for it.
4795           The request from the dir node implies we're the master, so we set
4796           ourself as master in receive_request_reply, and verify here that
4797           we are indeed the master. */
4798
4799        if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4800                /* This should never happen */
4801                log_error(ls, "receive_lookup_reply %x from %d ret %d "
4802                          "master %d dir %d our %d first %x %s",
4803                          lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4804                          r->res_master_nodeid, r->res_dir_nodeid,
4805                          dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4806        }
4807
4808        if (ret_nodeid == dlm_our_nodeid()) {
4809                r->res_master_nodeid = ret_nodeid;
4810                r->res_nodeid = 0;
4811                do_lookup_list = 1;
4812                r->res_first_lkid = 0;
4813        } else if (ret_nodeid == -1) {
4814                /* the remote node doesn't believe it's the dir node */
4815                log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4816                          lkb->lkb_id, ms->m_header.h_nodeid);
4817                r->res_master_nodeid = 0;
4818                r->res_nodeid = -1;
4819                lkb->lkb_nodeid = -1;
4820        } else {
4821                /* set_master() will set lkb_nodeid from r */
4822                r->res_master_nodeid = ret_nodeid;
4823                r->res_nodeid = ret_nodeid;
4824        }
4825
4826        if (is_overlap(lkb)) {
4827                log_debug(ls, "receive_lookup_reply %x unlock %x",
4828                          lkb->lkb_id, lkb->lkb_flags);
4829                queue_cast_overlap(r, lkb);
4830                unhold_lkb(lkb); /* undoes create_lkb() */
4831                goto out_list;
4832        }
4833
4834        _request_lock(r, lkb);
4835
4836 out_list:
4837        if (do_lookup_list)
4838                process_lookup_list(r);
4839 out:
4840        unlock_rsb(r);
4841        put_rsb(r);
4842        dlm_put_lkb(lkb);
4843}
4844
4845static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4846                             uint32_t saved_seq)
4847{
4848        int error = 0, noent = 0;
4849
4850        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4851                log_limit(ls, "receive %d from non-member %d %x %x %d",
4852                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4853                          ms->m_remid, ms->m_result);
4854                return;
4855        }
4856
4857        switch (ms->m_type) {
4858
4859        /* messages sent to a master node */
4860
4861        case DLM_MSG_REQUEST:
4862                error = receive_request(ls, ms);
4863                break;
4864
4865        case DLM_MSG_CONVERT:
4866                error = receive_convert(ls, ms);
4867                break;
4868
4869        case DLM_MSG_UNLOCK:
4870                error = receive_unlock(ls, ms);
4871                break;
4872
4873        case DLM_MSG_CANCEL:
4874                noent = 1;
4875                error = receive_cancel(ls, ms);
4876                break;
4877
4878        /* messages sent from a master node (replies to above) */
4879
4880        case DLM_MSG_REQUEST_REPLY:
4881                error = receive_request_reply(ls, ms);
4882                break;
4883
4884        case DLM_MSG_CONVERT_REPLY:
4885                error = receive_convert_reply(ls, ms);
4886                break;
4887
4888        case DLM_MSG_UNLOCK_REPLY:
4889                error = receive_unlock_reply(ls, ms);
4890                break;
4891
4892        case DLM_MSG_CANCEL_REPLY:
4893                error = receive_cancel_reply(ls, ms);
4894                break;
4895
4896        /* messages sent from a master node (only two types of async msg) */
4897
4898        case DLM_MSG_GRANT:
4899                noent = 1;
4900                error = receive_grant(ls, ms);
4901                break;
4902
4903        case DLM_MSG_BAST:
4904                noent = 1;
4905                error = receive_bast(ls, ms);
4906                break;
4907
4908        /* messages sent to a dir node */
4909
4910        case DLM_MSG_LOOKUP:
4911                receive_lookup(ls, ms);
4912                break;
4913
4914        case DLM_MSG_REMOVE:
4915                receive_remove(ls, ms);
4916                break;
4917
4918        /* messages sent from a dir node (remove has no reply) */
4919
4920        case DLM_MSG_LOOKUP_REPLY:
4921                receive_lookup_reply(ls, ms);
4922                break;
4923
4924        /* other messages */
4925
4926        case DLM_MSG_PURGE:
4927                receive_purge(ls, ms);
4928                break;
4929
4930        default:
4931                log_error(ls, "unknown message type %d", ms->m_type);
4932        }
4933
4934        /*
4935         * When checking for ENOENT, we're checking the result of
4936         * find_lkb(m_remid):
4937         *
4938         * The lock id referenced in the message wasn't found.  This may
4939         * happen in normal usage for the async messages and cancel, so
4940         * only use log_debug for them.
4941         *
4942         * Some errors are expected and normal.
4943         */
4944
4945        if (error == -ENOENT && noent) {
4946                log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4947                          ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4948                          ms->m_lkid, saved_seq);
4949        } else if (error == -ENOENT) {
4950                log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4951                          ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4952                          ms->m_lkid, saved_seq);
4953
4954                if (ms->m_type == DLM_MSG_CONVERT)
4955                        dlm_dump_rsb_hash(ls, ms->m_hash);
4956        }
4957
4958        if (error == -EINVAL) {
4959                log_error(ls, "receive %d inval from %d lkid %x remid %x "
4960                          "saved_seq %u",
4961                          ms->m_type, ms->m_header.h_nodeid,
4962                          ms->m_lkid, ms->m_remid, saved_seq);
4963        }
4964}
4965
4966/* If the lockspace is in recovery mode (locking stopped), then normal
4967   messages are saved on the requestqueue for processing after recovery is
4968   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4969   messages off the requestqueue before we process new ones. This occurs right
4970   after recovery completes when we transition from saving all messages on
4971   requestqueue, to processing all the saved messages, to processing new
4972   messages as they arrive. */
4973
4974static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4975                                int nodeid)
4976{
4977        if (dlm_locking_stopped(ls)) {
4978                /* If we were a member of this lockspace, left, and rejoined,
4979                   other nodes may still be sending us messages from the
4980                   lockspace generation before we left. */
4981                if (!ls->ls_generation) {
4982                        log_limit(ls, "receive %d from %d ignore old gen",
4983                                  ms->m_type, nodeid);
4984                        return;
4985                }
4986
4987                dlm_add_requestqueue(ls, nodeid, ms);
4988        } else {
4989                dlm_wait_requestqueue(ls);
4990                _receive_message(ls, ms, 0);
4991        }
4992}
4993
4994/* This is called by dlm_recoverd to process messages that were saved on
4995   the requestqueue. */
4996
4997void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4998                               uint32_t saved_seq)
4999{
5000        _receive_message(ls, ms, saved_seq);
5001}
5002
5003/* This is called by the midcomms layer when something is received for
5004   the lockspace.  It could be either a MSG (normal message sent as part of
5005   standard locking activity) or an RCOM (recovery message sent as part of
5006   lockspace recovery). */
5007
5008void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5009{
5010        struct dlm_header *hd = &p->header;
5011        struct dlm_ls *ls;
5012        int type = 0;
5013
5014        switch (hd->h_cmd) {
5015        case DLM_MSG:
5016                dlm_message_in(&p->message);
5017                type = p->message.m_type;
5018                break;
5019        case DLM_RCOM:
5020                dlm_rcom_in(&p->rcom);
5021                type = p->rcom.rc_type;
5022                break;
5023        default:
5024                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5025                return;
5026        }
5027
5028        if (hd->h_nodeid != nodeid) {
5029                log_print("invalid h_nodeid %d from %d lockspace %x",
5030                          hd->h_nodeid, nodeid, hd->h_lockspace);
5031                return;
5032        }
5033
5034        ls = dlm_find_lockspace_global(hd->h_lockspace);
5035        if (!ls) {
5036                if (dlm_config.ci_log_debug) {
5037                        printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5038                                "%u from %d cmd %d type %d\n",
5039                                hd->h_lockspace, nodeid, hd->h_cmd, type);
5040                }
5041
5042                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5043                        dlm_send_ls_not_ready(nodeid, &p->rcom);
5044                return;
5045        }
5046
5047        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5048           be inactive (in this ls) before transitioning to recovery mode */
5049
5050        down_read(&ls->ls_recv_active);
5051        if (hd->h_cmd == DLM_MSG)
5052                dlm_receive_message(ls, &p->message, nodeid);
5053        else
5054                dlm_receive_rcom(ls, &p->rcom, nodeid);
5055        up_read(&ls->ls_recv_active);
5056
5057        dlm_put_lockspace(ls);
5058}
5059
5060static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5061                                   struct dlm_message *ms_stub)
5062{
5063        if (middle_conversion(lkb)) {
5064                hold_lkb(lkb);
5065                memset(ms_stub, 0, sizeof(struct dlm_message));
5066                ms_stub->m_flags = DLM_IFL_STUB_MS;
5067                ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5068                ms_stub->m_result = -EINPROGRESS;
5069                ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5070                _receive_convert_reply(lkb, ms_stub);
5071
5072                /* Same special case as in receive_rcom_lock_args() */
5073                lkb->lkb_grmode = DLM_LOCK_IV;
5074                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5075                unhold_lkb(lkb);
5076
5077        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5078                lkb->lkb_flags |= DLM_IFL_RESEND;
5079        }
5080
5081        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5082           conversions are async; there's no reply from the remote master */
5083}
5084
5085/* A waiting lkb needs recovery if the master node has failed, or
5086   the master node is changing (only when no directory is used) */
5087
5088static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5089                                 int dir_nodeid)
5090{
5091        if (dlm_no_directory(ls))
5092                return 1;
5093
5094        if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5095                return 1;
5096
5097        return 0;
5098}
5099
5100/* Recovery for locks that are waiting for replies from nodes that are now
5101   gone.  We can just complete unlocks and cancels by faking a reply from the
5102   dead node.  Requests and up-conversions we flag to be resent after
5103   recovery.  Down-conversions can just be completed with a fake reply like
5104   unlocks.  Conversions between PR and CW need special attention. */
5105
5106void dlm_recover_waiters_pre(struct dlm_ls *ls)
5107{
5108        struct dlm_lkb *lkb, *safe;
5109        struct dlm_message *ms_stub;
5110        int wait_type, stub_unlock_result, stub_cancel_result;
5111        int dir_nodeid;
5112
5113        ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5114        if (!ms_stub) {
5115                log_error(ls, "dlm_recover_waiters_pre no mem");
5116                return;
5117        }
5118
5119        mutex_lock(&ls->ls_waiters_mutex);
5120
5121        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5122
5123                dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5124
5125                /* exclude debug messages about unlocks because there can be so
5126                   many and they aren't very interesting */
5127
5128                if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5129                        log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5130                                  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5131                                  lkb->lkb_id,
5132                                  lkb->lkb_remid,
5133                                  lkb->lkb_wait_type,
5134                                  lkb->lkb_resource->res_nodeid,
5135                                  lkb->lkb_nodeid,
5136                                  lkb->lkb_wait_nodeid,
5137                                  dir_nodeid);
5138                }
5139
5140                /* all outstanding lookups, regardless of destination  will be
5141                   resent after recovery is done */
5142
5143                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5144                        lkb->lkb_flags |= DLM_IFL_RESEND;
5145                        continue;
5146                }
5147
5148                if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5149                        continue;
5150
5151                wait_type = lkb->lkb_wait_type;
5152                stub_unlock_result = -DLM_EUNLOCK;
5153                stub_cancel_result = -DLM_ECANCEL;
5154
5155                /* Main reply may have been received leaving a zero wait_type,
5156                   but a reply for the overlapping op may not have been
5157                   received.  In that case we need to fake the appropriate
5158                   reply for the overlap op. */
5159
5160                if (!wait_type) {
5161                        if (is_overlap_cancel(lkb)) {
5162                                wait_type = DLM_MSG_CANCEL;
5163                                if (lkb->lkb_grmode == DLM_LOCK_IV)
5164                                        stub_cancel_result = 0;
5165                        }
5166                        if (is_overlap_unlock(lkb)) {
5167                                wait_type = DLM_MSG_UNLOCK;
5168                                if (lkb->lkb_grmode == DLM_LOCK_IV)
5169                                        stub_unlock_result = -ENOENT;
5170                        }
5171
5172                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
5173                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
5174                                  stub_cancel_result, stub_unlock_result);
5175                }
5176
5177                switch (wait_type) {
5178
5179                case DLM_MSG_REQUEST:
5180                        lkb->lkb_flags |= DLM_IFL_RESEND;
5181                        break;
5182
5183                case DLM_MSG_CONVERT:
5184                        recover_convert_waiter(ls, lkb, ms_stub);
5185                        break;
5186
5187                case DLM_MSG_UNLOCK:
5188                        hold_lkb(lkb);
5189                        memset(ms_stub, 0, sizeof(struct dlm_message));
5190                        ms_stub->m_flags = DLM_IFL_STUB_MS;
5191                        ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5192                        ms_stub->m_result = stub_unlock_result;
5193                        ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5194                        _receive_unlock_reply(lkb, ms_stub);
5195                        dlm_put_lkb(lkb);
5196                        break;
5197
5198                case DLM_MSG_CANCEL:
5199                        hold_lkb(lkb);
5200                        memset(ms_stub, 0, sizeof(struct dlm_message));
5201                        ms_stub->m_flags = DLM_IFL_STUB_MS;
5202                        ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5203                        ms_stub->m_result = stub_cancel_result;
5204                        ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5205                        _receive_cancel_reply(lkb, ms_stub);
5206                        dlm_put_lkb(lkb);
5207                        break;
5208
5209                default:
5210                        log_error(ls, "invalid lkb wait_type %d %d",
5211                                  lkb->lkb_wait_type, wait_type);
5212                }
5213                schedule();
5214        }
5215        mutex_unlock(&ls->ls_waiters_mutex);
5216        kfree(ms_stub);
5217}
5218
5219static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5220{
5221        struct dlm_lkb *lkb;
5222        int found = 0;
5223
5224        mutex_lock(&ls->ls_waiters_mutex);
5225        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5226                if (lkb->lkb_flags & DLM_IFL_RESEND) {
5227                        hold_lkb(lkb);
5228                        found = 1;
5229                        break;
5230                }
5231        }
5232        mutex_unlock(&ls->ls_waiters_mutex);
5233
5234        if (!found)
5235                lkb = NULL;
5236        return lkb;
5237}
5238
5239/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5240   master or dir-node for r.  Processing the lkb may result in it being placed
5241   back on waiters. */
5242
5243/* We do this after normal locking has been enabled and any saved messages
5244   (in requestqueue) have been processed.  We should be confident that at
5245   this point we won't get or process a reply to any of these waiting
5246   operations.  But, new ops may be coming in on the rsbs/locks here from
5247   userspace or remotely. */
5248
5249/* there may have been an overlap unlock/cancel prior to recovery or after
5250   recovery.  if before, the lkb may still have a pos wait_count; if after, the
5251   overlap flag would just have been set and nothing new sent.  we can be
5252   confident here than any replies to either the initial op or overlap ops
5253   prior to recovery have been received. */
5254
5255int dlm_recover_waiters_post(struct dlm_ls *ls)
5256{
5257        struct dlm_lkb *lkb;
5258        struct dlm_rsb *r;
5259        int error = 0, mstype, err, oc, ou;
5260
5261        while (1) {
5262                if (dlm_locking_stopped(ls)) {
5263                        log_debug(ls, "recover_waiters_post aborted");
5264                        error = -EINTR;
5265                        break;
5266                }
5267
5268                lkb = find_resend_waiter(ls);
5269                if (!lkb)
5270                        break;
5271
5272                r = lkb->lkb_resource;
5273                hold_rsb(r);
5274                lock_rsb(r);
5275
5276                mstype = lkb->lkb_wait_type;
5277                oc = is_overlap_cancel(lkb);
5278                ou = is_overlap_unlock(lkb);
5279                err = 0;
5280
5281                log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5282                          "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5283                          "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5284                          r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5285                          dlm_dir_nodeid(r), oc, ou);
5286
5287                /* At this point we assume that we won't get a reply to any
5288                   previous op or overlap op on this lock.  First, do a big
5289                   remove_from_waiters() for all previous ops. */
5290
5291                lkb->lkb_flags &= ~DLM_IFL_RESEND;
5292                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5293                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5294                lkb->lkb_wait_type = 0;
5295                lkb->lkb_wait_count = 0;
5296                mutex_lock(&ls->ls_waiters_mutex);
5297                list_del_init(&lkb->lkb_wait_reply);
5298                mutex_unlock(&ls->ls_waiters_mutex);
5299                unhold_lkb(lkb); /* for waiters list */
5300
5301                if (oc || ou) {
5302                        /* do an unlock or cancel instead of resending */
5303                        switch (mstype) {
5304                        case DLM_MSG_LOOKUP:
5305                        case DLM_MSG_REQUEST:
5306                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5307                                                        -DLM_ECANCEL);
5308                                unhold_lkb(lkb); /* undoes create_lkb() */
5309                                break;
5310                        case DLM_MSG_CONVERT:
5311                                if (oc) {
5312                                        queue_cast(r, lkb, -DLM_ECANCEL);
5313                                } else {
5314                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5315                                        _unlock_lock(r, lkb);
5316                                }
5317                                break;
5318                        default:
5319                                err = 1;
5320                        }
5321                } else {
5322                        switch (mstype) {
5323                        case DLM_MSG_LOOKUP:
5324                        case DLM_MSG_REQUEST:
5325                                _request_lock(r, lkb);
5326                                if (is_master(r))
5327                                        confirm_master(r, 0);
5328                                break;
5329                        case DLM_MSG_CONVERT:
5330                                _convert_lock(r, lkb);
5331                                break;
5332                        default:
5333                                err = 1;
5334                        }
5335                }
5336
5337                if (err) {
5338                        log_error(ls, "waiter %x msg %d r_nodeid %d "
5339                                  "dir_nodeid %d overlap %d %d",
5340                                  lkb->lkb_id, mstype, r->res_nodeid,
5341                                  dlm_dir_nodeid(r), oc, ou);
5342                }
5343                unlock_rsb(r);
5344                put_rsb(r);
5345                dlm_put_lkb(lkb);
5346        }
5347
5348        return error;
5349}
5350
5351static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5352                              struct list_head *list)
5353{
5354        struct dlm_lkb *lkb, *safe;
5355
5356        list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5357                if (!is_master_copy(lkb))
5358                        continue;
5359
5360                /* don't purge lkbs we've added in recover_master_copy for
5361                   the current recovery seq */
5362
5363                if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5364                        continue;
5365
5366                del_lkb(r, lkb);
5367
5368                /* this put should free the lkb */
5369                if (!dlm_put_lkb(lkb))
5370                        log_error(ls, "purged mstcpy lkb not released");
5371        }
5372}
5373
5374void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5375{
5376        struct dlm_ls *ls = r->res_ls;
5377
5378        purge_mstcpy_list(ls, r, &r->res_grantqueue);
5379        purge_mstcpy_list(ls, r, &r->res_convertqueue);
5380        purge_mstcpy_list(ls, r, &r->res_waitqueue);
5381}
5382
5383static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5384                            struct list_head *list,
5385                            int nodeid_gone, unsigned int *count)
5386{
5387        struct dlm_lkb *lkb, *safe;
5388
5389        list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5390                if (!is_master_copy(lkb))
5391                        continue;
5392
5393                if ((lkb->lkb_nodeid == nodeid_gone) ||
5394                    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5395
5396                        /* tell recover_lvb to invalidate the lvb
5397                           because a node holding EX/PW failed */
5398                        if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5399                            (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5400                                rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5401                        }
5402
5403                        del_lkb(r, lkb);
5404
5405                        /* this put should free the lkb */
5406                        if (!dlm_put_lkb(lkb))
5407                                log_error(ls, "purged dead lkb not released");
5408
5409                        rsb_set_flag(r, RSB_RECOVER_GRANT);
5410
5411                        (*count)++;
5412                }
5413        }
5414}
5415
5416/* Get rid of locks held by nodes that are gone. */
5417
5418void dlm_recover_purge(struct dlm_ls *ls)
5419{
5420        struct dlm_rsb *r;
5421        struct dlm_member *memb;
5422        int nodes_count = 0;
5423        int nodeid_gone = 0;
5424        unsigned int lkb_count = 0;
5425
5426        /* cache one removed nodeid to optimize the common
5427           case of a single node removed */
5428
5429        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5430                nodes_count++;
5431                nodeid_gone = memb->nodeid;
5432        }
5433
5434        if (!nodes_count)
5435                return;
5436
5437        down_write(&ls->ls_root_sem);
5438        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5439                hold_rsb(r);
5440                lock_rsb(r);
5441                if (is_master(r)) {
5442                        purge_dead_list(ls, r, &r->res_grantqueue,
5443                                        nodeid_gone, &lkb_count);
5444                        purge_dead_list(ls, r, &r->res_convertqueue,
5445                                        nodeid_gone, &lkb_count);
5446                        purge_dead_list(ls, r, &r->res_waitqueue,
5447                                        nodeid_gone, &lkb_count);
5448                }
5449                unlock_rsb(r);
5450                unhold_rsb(r);
5451                cond_resched();
5452        }
5453        up_write(&ls->ls_root_sem);
5454
5455        if (lkb_count)
5456                log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5457                          lkb_count, nodes_count);
5458}
5459
5460static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5461{
5462        struct rb_node *n;
5463        struct dlm_rsb *r;
5464
5465        spin_lock(&ls->ls_rsbtbl[bucket].lock);
5466        for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5467                r = rb_entry(n, struct dlm_rsb, res_hashnode);
5468
5469                if (!rsb_flag(r, RSB_RECOVER_GRANT))
5470                        continue;
5471                if (!is_master(r)) {
5472                        rsb_clear_flag(r, RSB_RECOVER_GRANT);
5473                        continue;
5474                }
5475                hold_rsb(r);
5476                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5477                return r;
5478        }
5479        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5480        return NULL;
5481}
5482
5483/*
5484 * Attempt to grant locks on resources that we are the master of.
5485 * Locks may have become grantable during recovery because locks
5486 * from departed nodes have been purged (or not rebuilt), allowing
5487 * previously blocked locks to now be granted.  The subset of rsb's
5488 * we are interested in are those with lkb's on either the convert or
5489 * waiting queues.
5490 *
5491 * Simplest would be to go through each master rsb and check for non-empty
5492 * convert or waiting queues, and attempt to grant on those rsbs.
5493 * Checking the queues requires lock_rsb, though, for which we'd need
5494 * to release the rsbtbl lock.  This would make iterating through all
5495 * rsb's very inefficient.  So, we rely on earlier recovery routines
5496 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5497 * locks for.
5498 */
5499
5500void dlm_recover_grant(struct dlm_ls *ls)
5501{
5502        struct dlm_rsb *r;
5503        int bucket = 0;
5504        unsigned int count = 0;
5505        unsigned int rsb_count = 0;
5506        unsigned int lkb_count = 0;
5507
5508        while (1) {
5509                r = find_grant_rsb(ls, bucket);
5510                if (!r) {
5511                        if (bucket == ls->ls_rsbtbl_size - 1)
5512                                break;
5513                        bucket++;
5514                        continue;
5515                }
5516                rsb_count++;
5517                count = 0;
5518                lock_rsb(r);
5519                /* the RECOVER_GRANT flag is checked in the grant path */
5520                grant_pending_locks(r, &count);
5521                rsb_clear_flag(r, RSB_RECOVER_GRANT);
5522                lkb_count += count;
5523                confirm_master(r, 0);
5524                unlock_rsb(r);
5525                put_rsb(r);
5526                cond_resched();
5527        }
5528
5529        if (lkb_count)
5530                log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5531                          lkb_count, rsb_count);
5532}
5533
5534static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5535                                         uint32_t remid)
5536{
5537        struct dlm_lkb *lkb;
5538
5539        list_for_each_entry(lkb, head, lkb_statequeue) {
5540                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5541                        return lkb;
5542        }
5543        return NULL;
5544}
5545
5546static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5547                                    uint32_t remid)
5548{
5549        struct dlm_lkb *lkb;
5550
5551        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5552        if (lkb)
5553                return lkb;
5554        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5555        if (lkb)
5556                return lkb;
5557        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5558        if (lkb)
5559                return lkb;
5560        return NULL;
5561}
5562
5563/* needs at least dlm_rcom + rcom_lock */
5564static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5565                                  struct dlm_rsb *r, struct dlm_rcom *rc)
5566{
5567        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5568
5569        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5570        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5571        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5572        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5573        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5574        lkb->lkb_flags |= DLM_IFL_MSTCPY;
5575        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5576        lkb->lkb_rqmode = rl->rl_rqmode;
5577        lkb->lkb_grmode = rl->rl_grmode;
5578        /* don't set lkb_status because add_lkb wants to itself */
5579
5580        lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5581        lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5582
5583        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5584                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5585                         sizeof(struct rcom_lock);
5586                if (lvblen > ls->ls_lvblen)
5587                        return -EINVAL;
5588                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5589                if (!lkb->lkb_lvbptr)
5590                        return -ENOMEM;
5591                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5592        }
5593
5594        /* Conversions between PR and CW (middle modes) need special handling.
5595           The real granted mode of these converting locks cannot be determined
5596           until all locks have been rebuilt on the rsb (recover_conversion) */
5597
5598        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5599            middle_conversion(lkb)) {
5600                rl->rl_status = DLM_LKSTS_CONVERT;
5601                lkb->lkb_grmode = DLM_LOCK_IV;
5602                rsb_set_flag(r, RSB_RECOVER_CONVERT);
5603        }
5604
5605        return 0;
5606}
5607
5608/* This lkb may have been recovered in a previous aborted recovery so we need
5609   to check if the rsb already has an lkb with the given remote nodeid/lkid.
5610   If so we just send back a standard reply.  If not, we create a new lkb with
5611   the given values and send back our lkid.  We send back our lkid by sending
5612   back the rcom_lock struct we got but with the remid field filled in. */
5613
5614/* needs at least dlm_rcom + rcom_lock */
5615int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5616{
5617        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5618        struct dlm_rsb *r;
5619        struct dlm_lkb *lkb;
5620        uint32_t remid = 0;
5621        int from_nodeid = rc->rc_header.h_nodeid;
5622        int error;
5623
5624        if (rl->rl_parent_lkid) {
5625                error = -EOPNOTSUPP;
5626                goto out;
5627        }
5628
5629        remid = le32_to_cpu(rl->rl_lkid);
5630
5631        /* In general we expect the rsb returned to be R_MASTER, but we don't
5632           have to require it.  Recovery of masters on one node can overlap
5633           recovery of locks on another node, so one node can send us MSTCPY
5634           locks before we've made ourselves master of this rsb.  We can still
5635           add new MSTCPY locks that we receive here without any harm; when
5636           we make ourselves master, dlm_recover_masters() won't touch the
5637           MSTCPY locks we've received early. */
5638
5639        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5640                         from_nodeid, R_RECEIVE_RECOVER, &r);
5641        if (error)
5642                goto out;
5643
5644        lock_rsb(r);
5645
5646        if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5647                log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5648                          from_nodeid, remid);
5649                error = -EBADR;
5650                goto out_unlock;
5651        }
5652
5653        lkb = search_remid(r, from_nodeid, remid);
5654        if (lkb) {
5655                error = -EEXIST;
5656                goto out_remid;
5657        }
5658
5659        error = create_lkb(ls, &lkb);
5660        if (error)
5661                goto out_unlock;
5662
5663        error = receive_rcom_lock_args(ls, lkb, r, rc);
5664        if (error) {
5665                __put_lkb(ls, lkb);
5666                goto out_unlock;
5667        }
5668
5669        attach_lkb(r, lkb);
5670        add_lkb(r, lkb, rl->rl_status);
5671        error = 0;
5672        ls->ls_recover_locks_in++;
5673
5674        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5675                rsb_set_flag(r, RSB_RECOVER_GRANT);
5676
5677 out_remid:
5678        /* this is the new value returned to the lock holder for
5679           saving in its process-copy lkb */
5680        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5681
5682        lkb->lkb_recover_seq = ls->ls_recover_seq;
5683
5684 out_unlock:
5685        unlock_rsb(r);
5686        put_rsb(r);
5687 out:
5688        if (error && error != -EEXIST)
5689                log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5690                          from_nodeid, remid, error);
5691        rl->rl_result = cpu_to_le32(error);
5692        return error;
5693}
5694
5695/* needs at least dlm_rcom + rcom_lock */
5696int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5697{
5698        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5699        struct dlm_rsb *r;
5700        struct dlm_lkb *lkb;
5701        uint32_t lkid, remid;
5702        int error, result;
5703
5704        lkid = le32_to_cpu(rl->rl_lkid);
5705        remid = le32_to_cpu(rl->rl_remid);
5706        result = le32_to_cpu(rl->rl_result);
5707
5708        error = find_lkb(ls, lkid, &lkb);
5709        if (error) {
5710                log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5711                          lkid, rc->rc_header.h_nodeid, remid, result);
5712                return error;
5713        }
5714
5715        r = lkb->lkb_resource;
5716        hold_rsb(r);
5717        lock_rsb(r);
5718
5719        if (!is_process_copy(lkb)) {
5720                log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5721                          lkid, rc->rc_header.h_nodeid, remid, result);
5722                dlm_dump_rsb(r);
5723                unlock_rsb(r);
5724                put_rsb(r);
5725                dlm_put_lkb(lkb);
5726                return -EINVAL;
5727        }
5728
5729        switch (result) {
5730        case -EBADR:
5731                /* There's a chance the new master received our lock before
5732                   dlm_recover_master_reply(), this wouldn't happen if we did
5733                   a barrier between recover_masters and recover_locks. */
5734
5735                log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5736                          lkid, rc->rc_header.h_nodeid, remid, result);
5737        
5738                dlm_send_rcom_lock(r, lkb);
5739                goto out;
5740        case -EEXIST:
5741        case 0:
5742                lkb->lkb_remid = remid;
5743                break;
5744        default:
5745                log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5746                          lkid, rc->rc_header.h_nodeid, remid, result);
5747        }
5748
5749        /* an ack for dlm_recover_locks() which waits for replies from
5750           all the locks it sends to new masters */
5751        dlm_recovered_lock(r);
5752 out:
5753        unlock_rsb(r);
5754        put_rsb(r);
5755        dlm_put_lkb(lkb);
5756
5757        return 0;
5758}
5759
5760int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5761                     int mode, uint32_t flags, void *name, unsigned int namelen,
5762                     unsigned long timeout_cs)
5763{
5764        struct dlm_lkb *lkb;
5765        struct dlm_args args;
5766        int error;
5767
5768        dlm_lock_recovery(ls);
5769
5770        error = create_lkb(ls, &lkb);
5771        if (error) {
5772                kfree(ua);
5773                goto out;
5774        }
5775
5776        if (flags & DLM_LKF_VALBLK) {
5777                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5778                if (!ua->lksb.sb_lvbptr) {
5779                        kfree(ua);
5780                        __put_lkb(ls, lkb);
5781                        error = -ENOMEM;
5782                        goto out;
5783                }
5784        }
5785
5786        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5787           When DLM_IFL_USER is set, the dlm knows that this is a userspace
5788           lock and that lkb_astparam is the dlm_user_args structure. */
5789
5790        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5791                              fake_astfn, ua, fake_bastfn, &args);
5792        lkb->lkb_flags |= DLM_IFL_USER;
5793
5794        if (error) {
5795                __put_lkb(ls, lkb);
5796                goto out;
5797        }
5798
5799        error = request_lock(ls, lkb, name, namelen, &args);
5800
5801        switch (error) {
5802        case 0:
5803                break;
5804        case -EINPROGRESS:
5805                error = 0;
5806                break;
5807        case -EAGAIN:
5808                error = 0;
5809                /* fall through */
5810        default:
5811                __put_lkb(ls, lkb);
5812                goto out;
5813        }
5814
5815        /* add this new lkb to the per-process list of locks */
5816        spin_lock(&ua->proc->locks_spin);
5817        hold_lkb(lkb);
5818        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5819        spin_unlock(&ua->proc->locks_spin);
5820 out:
5821        dlm_unlock_recovery(ls);
5822        return error;
5823}
5824
5825int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5826                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5827                     unsigned long timeout_cs)
5828{
5829        struct dlm_lkb *lkb;
5830        struct dlm_args args;
5831        struct dlm_user_args *ua;
5832        int error;
5833
5834        dlm_lock_recovery(ls);
5835
5836        error = find_lkb(ls, lkid, &lkb);
5837        if (error)
5838                goto out;
5839
5840        /* user can change the params on its lock when it converts it, or
5841           add an lvb that didn't exist before */
5842
5843        ua = lkb->lkb_ua;
5844
5845        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5846                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5847                if (!ua->lksb.sb_lvbptr) {
5848                        error = -ENOMEM;
5849                        goto out_put;
5850                }
5851        }
5852        if (lvb_in && ua->lksb.sb_lvbptr)
5853                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5854
5855        ua->xid = ua_tmp->xid;
5856        ua->castparam = ua_tmp->castparam;
5857        ua->castaddr = ua_tmp->castaddr;
5858        ua->bastparam = ua_tmp->bastparam;
5859        ua->bastaddr = ua_tmp->bastaddr;
5860        ua->user_lksb = ua_tmp->user_lksb;
5861
5862        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5863                              fake_astfn, ua, fake_bastfn, &args);
5864        if (error)
5865                goto out_put;
5866
5867        error = convert_lock(ls, lkb, &args);
5868
5869        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5870                error = 0;
5871 out_put:
5872        dlm_put_lkb(lkb);
5873 out:
5874        dlm_unlock_recovery(ls);
5875        kfree(ua_tmp);
5876        return error;
5877}
5878
5879int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5880                    uint32_t flags, uint32_t lkid, char *lvb_in)
5881{
5882        struct dlm_lkb *lkb;
5883        struct dlm_args args;
5884        struct dlm_user_args *ua;
5885        int error;
5886
5887        dlm_lock_recovery(ls);
5888
5889        error = find_lkb(ls, lkid, &lkb);
5890        if (error)
5891                goto out;
5892
5893        ua = lkb->lkb_ua;
5894
5895        if (lvb_in && ua->lksb.sb_lvbptr)
5896                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5897        if (ua_tmp->castparam)
5898                ua->castparam = ua_tmp->castparam;
5899        ua->user_lksb = ua_tmp->user_lksb;
5900
5901        error = set_unlock_args(flags, ua, &args);
5902        if (error)
5903                goto out_put;
5904
5905        error = unlock_lock(ls, lkb, &args);
5906
5907        if (error == -DLM_EUNLOCK)
5908                error = 0;
5909        /* from validate_unlock_args() */
5910        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5911                error = 0;
5912        if (error)
5913                goto out_put;
5914
5915        spin_lock(&ua->proc->locks_spin);
5916        /* dlm_user_add_cb() may have already taken lkb off the proc list */
5917        if (!list_empty(&lkb->lkb_ownqueue))
5918                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5919        spin_unlock(&ua->proc->locks_spin);
5920 out_put:
5921        dlm_put_lkb(lkb);
5922 out:
5923        dlm_unlock_recovery(ls);
5924        kfree(ua_tmp);
5925        return error;
5926}
5927
5928int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929                    uint32_t flags, uint32_t lkid)
5930{
5931        struct dlm_lkb *lkb;
5932        struct dlm_args args;
5933        struct dlm_user_args *ua;
5934        int error;
5935
5936        dlm_lock_recovery(ls);
5937
5938        error = find_lkb(ls, lkid, &lkb);
5939        if (error)
5940                goto out;
5941
5942        ua = lkb->lkb_ua;
5943        if (ua_tmp->castparam)
5944                ua->castparam = ua_tmp->castparam;
5945        ua->user_lksb = ua_tmp->user_lksb;
5946
5947        error = set_unlock_args(flags, ua, &args);
5948        if (error)
5949                goto out_put;
5950
5951        error = cancel_lock(ls, lkb, &args);
5952
5953        if (error == -DLM_ECANCEL)
5954                error = 0;
5955        /* from validate_unlock_args() */
5956        if (error == -EBUSY)
5957                error = 0;
5958 out_put:
5959        dlm_put_lkb(lkb);
5960 out:
5961        dlm_unlock_recovery(ls);
5962        kfree(ua_tmp);
5963        return error;
5964}
5965
5966int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5967{
5968        struct dlm_lkb *lkb;
5969        struct dlm_args args;
5970        struct dlm_user_args *ua;
5971        struct dlm_rsb *r;
5972        int error;
5973
5974        dlm_lock_recovery(ls);
5975
5976        error = find_lkb(ls, lkid, &lkb);
5977        if (error)
5978                goto out;
5979
5980        ua = lkb->lkb_ua;
5981
5982        error = set_unlock_args(flags, ua, &args);
5983        if (error)
5984                goto out_put;
5985
5986        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5987
5988        r = lkb->lkb_resource;
5989        hold_rsb(r);
5990        lock_rsb(r);
5991
5992        error = validate_unlock_args(lkb, &args);
5993        if (error)
5994                goto out_r;
5995        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5996
5997        error = _cancel_lock(r, lkb);
5998 out_r:
5999        unlock_rsb(r);
6000        put_rsb(r);
6001
6002        if (error == -DLM_ECANCEL)
6003                error = 0;
6004        /* from validate_unlock_args() */
6005        if (error == -EBUSY)
6006                error = 0;
6007 out_put:
6008        dlm_put_lkb(lkb);
6009 out:
6010        dlm_unlock_recovery(ls);
6011        return error;
6012}
6013
6014/* lkb's that are removed from the waiters list by revert are just left on the
6015   orphans list with the granted orphan locks, to be freed by purge */
6016
6017static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6018{
6019        struct dlm_args args;
6020        int error;
6021
6022        hold_lkb(lkb);
6023        mutex_lock(&ls->ls_orphans_mutex);
6024        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6025        mutex_unlock(&ls->ls_orphans_mutex);
6026
6027        set_unlock_args(0, lkb->lkb_ua, &args);
6028
6029        error = cancel_lock(ls, lkb, &args);
6030        if (error == -DLM_ECANCEL)
6031                error = 0;
6032        return error;
6033}
6034
6035/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6036   granted.  Regardless of what rsb queue the lock is on, it's removed and
6037   freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6038   if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6039
6040static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6041{
6042        struct dlm_args args;
6043        int error;
6044
6045        set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6046                        lkb->lkb_ua, &args);
6047
6048        error = unlock_lock(ls, lkb, &args);
6049        if (error == -DLM_EUNLOCK)
6050                error = 0;
6051        return error;
6052}
6053
6054/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6055   (which does lock_rsb) due to deadlock with receiving a message that does
6056   lock_rsb followed by dlm_user_add_cb() */
6057
6058static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6059                                     struct dlm_user_proc *proc)
6060{
6061        struct dlm_lkb *lkb = NULL;
6062
6063        mutex_lock(&ls->ls_clear_proc_locks);
6064        if (list_empty(&proc->locks))
6065                goto out;
6066
6067        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6068        list_del_init(&lkb->lkb_ownqueue);
6069
6070        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6071                lkb->lkb_flags |= DLM_IFL_ORPHAN;
6072        else
6073                lkb->lkb_flags |= DLM_IFL_DEAD;
6074 out:
6075        mutex_unlock(&ls->ls_clear_proc_locks);
6076        return lkb;
6077}
6078
6079/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6080   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6081   which we clear here. */
6082
6083/* proc CLOSING flag is set so no more device_reads should look at proc->asts
6084   list, and no more device_writes should add lkb's to proc->locks list; so we
6085   shouldn't need to take asts_spin or locks_spin here.  this assumes that
6086   device reads/writes/closes are serialized -- FIXME: we may need to serialize
6087   them ourself. */
6088
6089void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6090{
6091        struct dlm_lkb *lkb, *safe;
6092
6093        dlm_lock_recovery(ls);
6094
6095        while (1) {
6096                lkb = del_proc_lock(ls, proc);
6097                if (!lkb)
6098                        break;
6099                del_timeout(lkb);
6100                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6101                        orphan_proc_lock(ls, lkb);
6102                else
6103                        unlock_proc_lock(ls, lkb);
6104
6105                /* this removes the reference for the proc->locks list
6106                   added by dlm_user_request, it may result in the lkb
6107                   being freed */
6108
6109                dlm_put_lkb(lkb);
6110        }
6111
6112        mutex_lock(&ls->ls_clear_proc_locks);
6113
6114        /* in-progress unlocks */
6115        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6116                list_del_init(&lkb->lkb_ownqueue);
6117                lkb->lkb_flags |= DLM_IFL_DEAD;
6118                dlm_put_lkb(lkb);
6119        }
6120
6121        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6122                memset(&lkb->lkb_callbacks, 0,
6123                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6124                list_del_init(&lkb->lkb_cb_list);
6125                dlm_put_lkb(lkb);
6126        }
6127
6128        mutex_unlock(&ls->ls_clear_proc_locks);
6129        dlm_unlock_recovery(ls);
6130}
6131
6132static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6133{
6134        struct dlm_lkb *lkb, *safe;
6135
6136        while (1) {
6137                lkb = NULL;
6138                spin_lock(&proc->locks_spin);
6139                if (!list_empty(&proc->locks)) {
6140                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
6141                                         lkb_ownqueue);
6142                        list_del_init(&lkb->lkb_ownqueue);
6143                }
6144                spin_unlock(&proc->locks_spin);
6145
6146                if (!lkb)
6147                        break;
6148
6149                lkb->lkb_flags |= DLM_IFL_DEAD;
6150                unlock_proc_lock(ls, lkb);
6151                dlm_put_lkb(lkb); /* ref from proc->locks list */
6152        }
6153
6154        spin_lock(&proc->locks_spin);
6155        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6156                list_del_init(&lkb->lkb_ownqueue);
6157                lkb->lkb_flags |= DLM_IFL_DEAD;
6158                dlm_put_lkb(lkb);
6159        }
6160        spin_unlock(&proc->locks_spin);
6161
6162        spin_lock(&proc->asts_spin);
6163        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6164                memset(&lkb->lkb_callbacks, 0,
6165                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6166                list_del_init(&lkb->lkb_cb_list);
6167                dlm_put_lkb(lkb);
6168        }
6169        spin_unlock(&proc->asts_spin);
6170}
6171
6172/* pid of 0 means purge all orphans */
6173
6174static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6175{
6176        struct dlm_lkb *lkb, *safe;
6177
6178        mutex_lock(&ls->ls_orphans_mutex);
6179        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6180                if (pid && lkb->lkb_ownpid != pid)
6181                        continue;
6182                unlock_proc_lock(ls, lkb);
6183                list_del_init(&lkb->lkb_ownqueue);
6184                dlm_put_lkb(lkb);
6185        }
6186        mutex_unlock(&ls->ls_orphans_mutex);
6187}
6188
6189static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6190{
6191        struct dlm_message *ms;
6192        struct dlm_mhandle *mh;
6193        int error;
6194
6195        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6196                                DLM_MSG_PURGE, &ms, &mh);
6197        if (error)
6198                return error;
6199        ms->m_nodeid = nodeid;
6200        ms->m_pid = pid;
6201
6202        return send_message(mh, ms);
6203}
6204
6205int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6206                   int nodeid, int pid)
6207{
6208        int error = 0;
6209
6210        if (nodeid != dlm_our_nodeid()) {
6211                error = send_purge(ls, nodeid, pid);
6212        } else {
6213                dlm_lock_recovery(ls);
6214                if (pid == current->pid)
6215                        purge_proc_locks(ls, proc);
6216                else
6217                        do_purge(ls, nodeid, pid);
6218                dlm_unlock_recovery(ls);
6219        }
6220        return error;
6221}
6222
6223