linux/fs/dlm/lock.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include <linux/slab.h>
  60#include "dlm_internal.h"
  61#include <linux/dlm_device.h>
  62#include "memory.h"
  63#include "lowcomms.h"
  64#include "requestqueue.h"
  65#include "util.h"
  66#include "dir.h"
  67#include "member.h"
  68#include "lockspace.h"
  69#include "ast.h"
  70#include "lock.h"
  71#include "rcom.h"
  72#include "recover.h"
  73#include "lvb_table.h"
  74#include "user.h"
  75#include "config.h"
  76
  77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  84static int send_remove(struct dlm_rsb *r);
  85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  88                                    struct dlm_message *ms);
  89static int receive_extralen(struct dlm_message *ms);
  90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  91static void del_timeout(struct dlm_lkb *lkb);
  92
  93/*
  94 * Lock compatibilty matrix - thanks Steve
  95 * UN = Unlocked state. Not really a state, used as a flag
  96 * PD = Padding. Used to make the matrix a nice power of two in size
  97 * Other states are the same as the VMS DLM.
  98 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
  99 */
 100
 101static const int __dlm_compat_matrix[8][8] = {
 102      /* UN NL CR CW PR PW EX PD */
 103        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 104        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 105        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 106        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 107        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 108        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 109        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 110        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 111};
 112
 113/*
 114 * This defines the direction of transfer of LVB data.
 115 * Granted mode is the row; requested mode is the column.
 116 * Usage: matrix[grmode+1][rqmode+1]
 117 * 1 = LVB is returned to the caller
 118 * 0 = LVB is written to the resource
 119 * -1 = nothing happens to the LVB
 120 */
 121
 122const int dlm_lvb_operations[8][8] = {
 123        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 124        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 125        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 126        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 127        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 128        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 129        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 130        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 131        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 132};
 133
 134#define modes_compat(gr, rq) \
 135        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 136
 137int dlm_modes_compat(int mode1, int mode2)
 138{
 139        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 140}
 141
 142/*
 143 * Compatibility matrix for conversions with QUECVT set.
 144 * Granted mode is the row; requested mode is the column.
 145 * Usage: matrix[grmode+1][rqmode+1]
 146 */
 147
 148static const int __quecvt_compat_matrix[8][8] = {
 149      /* UN NL CR CW PR PW EX PD */
 150        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 151        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 152        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 153        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 154        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 155        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 156        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 157        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 158};
 159
 160void dlm_print_lkb(struct dlm_lkb *lkb)
 161{
 162        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 163               "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
 164               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 165               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 166               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 167}
 168
 169static void dlm_print_rsb(struct dlm_rsb *r)
 170{
 171        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
 172               r->res_nodeid, r->res_flags, r->res_first_lkid,
 173               r->res_recover_locks_count, r->res_name);
 174}
 175
 176void dlm_dump_rsb(struct dlm_rsb *r)
 177{
 178        struct dlm_lkb *lkb;
 179
 180        dlm_print_rsb(r);
 181
 182        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 183               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 184        printk(KERN_ERR "rsb lookup list\n");
 185        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 186                dlm_print_lkb(lkb);
 187        printk(KERN_ERR "rsb grant queue:\n");
 188        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 189                dlm_print_lkb(lkb);
 190        printk(KERN_ERR "rsb convert queue:\n");
 191        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 192                dlm_print_lkb(lkb);
 193        printk(KERN_ERR "rsb wait queue:\n");
 194        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 195                dlm_print_lkb(lkb);
 196}
 197
 198/* Threads cannot use the lockspace while it's being recovered */
 199
 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
 201{
 202        down_read(&ls->ls_in_recovery);
 203}
 204
 205void dlm_unlock_recovery(struct dlm_ls *ls)
 206{
 207        up_read(&ls->ls_in_recovery);
 208}
 209
 210int dlm_lock_recovery_try(struct dlm_ls *ls)
 211{
 212        return down_read_trylock(&ls->ls_in_recovery);
 213}
 214
 215static inline int can_be_queued(struct dlm_lkb *lkb)
 216{
 217        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 218}
 219
 220static inline int force_blocking_asts(struct dlm_lkb *lkb)
 221{
 222        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 223}
 224
 225static inline int is_demoted(struct dlm_lkb *lkb)
 226{
 227        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 228}
 229
 230static inline int is_altmode(struct dlm_lkb *lkb)
 231{
 232        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 233}
 234
 235static inline int is_granted(struct dlm_lkb *lkb)
 236{
 237        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 238}
 239
 240static inline int is_remote(struct dlm_rsb *r)
 241{
 242        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 243        return !!r->res_nodeid;
 244}
 245
 246static inline int is_process_copy(struct dlm_lkb *lkb)
 247{
 248        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 249}
 250
 251static inline int is_master_copy(struct dlm_lkb *lkb)
 252{
 253        if (lkb->lkb_flags & DLM_IFL_MSTCPY)
 254                DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 255        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 256}
 257
 258static inline int middle_conversion(struct dlm_lkb *lkb)
 259{
 260        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 261            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 262                return 1;
 263        return 0;
 264}
 265
 266static inline int down_conversion(struct dlm_lkb *lkb)
 267{
 268        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 269}
 270
 271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 272{
 273        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 274}
 275
 276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 277{
 278        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 279}
 280
 281static inline int is_overlap(struct dlm_lkb *lkb)
 282{
 283        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 284                                  DLM_IFL_OVERLAP_CANCEL));
 285}
 286
 287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 288{
 289        if (is_master_copy(lkb))
 290                return;
 291
 292        del_timeout(lkb);
 293
 294        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 295
 296        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 297           timeout caused the cancel then return -ETIMEDOUT */
 298        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 299                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 300                rv = -ETIMEDOUT;
 301        }
 302
 303        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 304                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 305                rv = -EDEADLK;
 306        }
 307
 308        lkb->lkb_lksb->sb_status = rv;
 309        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 310
 311        dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
 312}
 313
 314static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 315{
 316        queue_cast(r, lkb,
 317                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 318}
 319
 320static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 321{
 322        lkb->lkb_time_bast = ktime_get();
 323
 324        if (is_master_copy(lkb)) {
 325                lkb->lkb_bastmode = rqmode; /* printed by debugfs */
 326                send_bast(r, lkb, rqmode);
 327        } else {
 328                dlm_add_ast(lkb, AST_BAST, rqmode);
 329        }
 330}
 331
 332/*
 333 * Basic operations on rsb's and lkb's
 334 */
 335
 336static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 337{
 338        struct dlm_rsb *r;
 339
 340        r = dlm_allocate_rsb(ls, len);
 341        if (!r)
 342                return NULL;
 343
 344        r->res_ls = ls;
 345        r->res_length = len;
 346        memcpy(r->res_name, name, len);
 347        mutex_init(&r->res_mutex);
 348
 349        INIT_LIST_HEAD(&r->res_lookup);
 350        INIT_LIST_HEAD(&r->res_grantqueue);
 351        INIT_LIST_HEAD(&r->res_convertqueue);
 352        INIT_LIST_HEAD(&r->res_waitqueue);
 353        INIT_LIST_HEAD(&r->res_root_list);
 354        INIT_LIST_HEAD(&r->res_recover_list);
 355
 356        return r;
 357}
 358
 359static int search_rsb_list(struct list_head *head, char *name, int len,
 360                           unsigned int flags, struct dlm_rsb **r_ret)
 361{
 362        struct dlm_rsb *r;
 363        int error = 0;
 364
 365        list_for_each_entry(r, head, res_hashchain) {
 366                if (len == r->res_length && !memcmp(name, r->res_name, len))
 367                        goto found;
 368        }
 369        *r_ret = NULL;
 370        return -EBADR;
 371
 372 found:
 373        if (r->res_nodeid && (flags & R_MASTER))
 374                error = -ENOTBLK;
 375        *r_ret = r;
 376        return error;
 377}
 378
 379static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 380                       unsigned int flags, struct dlm_rsb **r_ret)
 381{
 382        struct dlm_rsb *r;
 383        int error;
 384
 385        error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
 386        if (!error) {
 387                kref_get(&r->res_ref);
 388                goto out;
 389        }
 390        error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 391        if (error)
 392                goto out;
 393
 394        list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
 395
 396        if (dlm_no_directory(ls))
 397                goto out;
 398
 399        if (r->res_nodeid == -1) {
 400                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 401                r->res_first_lkid = 0;
 402        } else if (r->res_nodeid > 0) {
 403                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 404                r->res_first_lkid = 0;
 405        } else {
 406                DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
 407                DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
 408        }
 409 out:
 410        *r_ret = r;
 411        return error;
 412}
 413
 414static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 415                      unsigned int flags, struct dlm_rsb **r_ret)
 416{
 417        int error;
 418        spin_lock(&ls->ls_rsbtbl[b].lock);
 419        error = _search_rsb(ls, name, len, b, flags, r_ret);
 420        spin_unlock(&ls->ls_rsbtbl[b].lock);
 421        return error;
 422}
 423
 424/*
 425 * Find rsb in rsbtbl and potentially create/add one
 426 *
 427 * Delaying the release of rsb's has a similar benefit to applications keeping
 428 * NL locks on an rsb, but without the guarantee that the cached master value
 429 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 430 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 431 * to excessive master lookups and removals if we don't delay the release.
 432 *
 433 * Searching for an rsb means looking through both the normal list and toss
 434 * list.  When found on the toss list the rsb is moved to the normal list with
 435 * ref count of 1; when found on normal list the ref count is incremented.
 436 */
 437
 438static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 439                    unsigned int flags, struct dlm_rsb **r_ret)
 440{
 441        struct dlm_rsb *r = NULL, *tmp;
 442        uint32_t hash, bucket;
 443        int error = -EINVAL;
 444
 445        if (namelen > DLM_RESNAME_MAXLEN)
 446                goto out;
 447
 448        if (dlm_no_directory(ls))
 449                flags |= R_CREATE;
 450
 451        error = 0;
 452        hash = jhash(name, namelen, 0);
 453        bucket = hash & (ls->ls_rsbtbl_size - 1);
 454
 455        error = search_rsb(ls, name, namelen, bucket, flags, &r);
 456        if (!error)
 457                goto out;
 458
 459        if (error == -EBADR && !(flags & R_CREATE))
 460                goto out;
 461
 462        /* the rsb was found but wasn't a master copy */
 463        if (error == -ENOTBLK)
 464                goto out;
 465
 466        error = -ENOMEM;
 467        r = create_rsb(ls, name, namelen);
 468        if (!r)
 469                goto out;
 470
 471        r->res_hash = hash;
 472        r->res_bucket = bucket;
 473        r->res_nodeid = -1;
 474        kref_init(&r->res_ref);
 475
 476        /* With no directory, the master can be set immediately */
 477        if (dlm_no_directory(ls)) {
 478                int nodeid = dlm_dir_nodeid(r);
 479                if (nodeid == dlm_our_nodeid())
 480                        nodeid = 0;
 481                r->res_nodeid = nodeid;
 482        }
 483
 484        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 485        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
 486        if (!error) {
 487                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 488                dlm_free_rsb(r);
 489                r = tmp;
 490                goto out;
 491        }
 492        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
 493        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 494        error = 0;
 495 out:
 496        *r_ret = r;
 497        return error;
 498}
 499
 500/* This is only called to add a reference when the code already holds
 501   a valid reference to the rsb, so there's no need for locking. */
 502
 503static inline void hold_rsb(struct dlm_rsb *r)
 504{
 505        kref_get(&r->res_ref);
 506}
 507
 508void dlm_hold_rsb(struct dlm_rsb *r)
 509{
 510        hold_rsb(r);
 511}
 512
 513static void toss_rsb(struct kref *kref)
 514{
 515        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 516        struct dlm_ls *ls = r->res_ls;
 517
 518        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 519        kref_init(&r->res_ref);
 520        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
 521        r->res_toss_time = jiffies;
 522        if (r->res_lvbptr) {
 523                dlm_free_lvb(r->res_lvbptr);
 524                r->res_lvbptr = NULL;
 525        }
 526}
 527
 528/* When all references to the rsb are gone it's transfered to
 529   the tossed list for later disposal. */
 530
 531static void put_rsb(struct dlm_rsb *r)
 532{
 533        struct dlm_ls *ls = r->res_ls;
 534        uint32_t bucket = r->res_bucket;
 535
 536        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 537        kref_put(&r->res_ref, toss_rsb);
 538        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 539}
 540
 541void dlm_put_rsb(struct dlm_rsb *r)
 542{
 543        put_rsb(r);
 544}
 545
 546/* See comment for unhold_lkb */
 547
 548static void unhold_rsb(struct dlm_rsb *r)
 549{
 550        int rv;
 551        rv = kref_put(&r->res_ref, toss_rsb);
 552        DLM_ASSERT(!rv, dlm_dump_rsb(r););
 553}
 554
 555static void kill_rsb(struct kref *kref)
 556{
 557        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 558
 559        /* All work is done after the return from kref_put() so we
 560           can release the write_lock before the remove and free. */
 561
 562        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 563        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 564        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 565        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 566        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 567        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 568}
 569
 570/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
 571   The rsb must exist as long as any lkb's for it do. */
 572
 573static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 574{
 575        hold_rsb(r);
 576        lkb->lkb_resource = r;
 577}
 578
 579static void detach_lkb(struct dlm_lkb *lkb)
 580{
 581        if (lkb->lkb_resource) {
 582                put_rsb(lkb->lkb_resource);
 583                lkb->lkb_resource = NULL;
 584        }
 585}
 586
 587static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 588{
 589        struct dlm_lkb *lkb, *tmp;
 590        uint32_t lkid = 0;
 591        uint16_t bucket;
 592
 593        lkb = dlm_allocate_lkb(ls);
 594        if (!lkb)
 595                return -ENOMEM;
 596
 597        lkb->lkb_nodeid = -1;
 598        lkb->lkb_grmode = DLM_LOCK_IV;
 599        kref_init(&lkb->lkb_ref);
 600        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 601        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 602        INIT_LIST_HEAD(&lkb->lkb_time_list);
 603
 604        get_random_bytes(&bucket, sizeof(bucket));
 605        bucket &= (ls->ls_lkbtbl_size - 1);
 606
 607        write_lock(&ls->ls_lkbtbl[bucket].lock);
 608
 609        /* counter can roll over so we must verify lkid is not in use */
 610
 611        while (lkid == 0) {
 612                lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
 613
 614                list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
 615                                    lkb_idtbl_list) {
 616                        if (tmp->lkb_id != lkid)
 617                                continue;
 618                        lkid = 0;
 619                        break;
 620                }
 621        }
 622
 623        lkb->lkb_id = lkid;
 624        list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
 625        write_unlock(&ls->ls_lkbtbl[bucket].lock);
 626
 627        *lkb_ret = lkb;
 628        return 0;
 629}
 630
 631static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 632{
 633        struct dlm_lkb *lkb;
 634        uint16_t bucket = (lkid >> 16);
 635
 636        list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
 637                if (lkb->lkb_id == lkid)
 638                        return lkb;
 639        }
 640        return NULL;
 641}
 642
 643static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 644{
 645        struct dlm_lkb *lkb;
 646        uint16_t bucket = (lkid >> 16);
 647
 648        if (bucket >= ls->ls_lkbtbl_size)
 649                return -EBADSLT;
 650
 651        read_lock(&ls->ls_lkbtbl[bucket].lock);
 652        lkb = __find_lkb(ls, lkid);
 653        if (lkb)
 654                kref_get(&lkb->lkb_ref);
 655        read_unlock(&ls->ls_lkbtbl[bucket].lock);
 656
 657        *lkb_ret = lkb;
 658        return lkb ? 0 : -ENOENT;
 659}
 660
 661static void kill_lkb(struct kref *kref)
 662{
 663        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
 664
 665        /* All work is done after the return from kref_put() so we
 666           can release the write_lock before the detach_lkb */
 667
 668        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 669}
 670
 671/* __put_lkb() is used when an lkb may not have an rsb attached to
 672   it so we need to provide the lockspace explicitly */
 673
 674static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 675{
 676        uint16_t bucket = (lkb->lkb_id >> 16);
 677
 678        write_lock(&ls->ls_lkbtbl[bucket].lock);
 679        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
 680                list_del(&lkb->lkb_idtbl_list);
 681                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 682
 683                detach_lkb(lkb);
 684
 685                /* for local/process lkbs, lvbptr points to caller's lksb */
 686                if (lkb->lkb_lvbptr && is_master_copy(lkb))
 687                        dlm_free_lvb(lkb->lkb_lvbptr);
 688                dlm_free_lkb(lkb);
 689                return 1;
 690        } else {
 691                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 692                return 0;
 693        }
 694}
 695
 696int dlm_put_lkb(struct dlm_lkb *lkb)
 697{
 698        struct dlm_ls *ls;
 699
 700        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
 701        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
 702
 703        ls = lkb->lkb_resource->res_ls;
 704        return __put_lkb(ls, lkb);
 705}
 706
 707/* This is only called to add a reference when the code already holds
 708   a valid reference to the lkb, so there's no need for locking. */
 709
 710static inline void hold_lkb(struct dlm_lkb *lkb)
 711{
 712        kref_get(&lkb->lkb_ref);
 713}
 714
 715/* This is called when we need to remove a reference and are certain
 716   it's not the last ref.  e.g. del_lkb is always called between a
 717   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
 718   put_lkb would work fine, but would involve unnecessary locking */
 719
 720static inline void unhold_lkb(struct dlm_lkb *lkb)
 721{
 722        int rv;
 723        rv = kref_put(&lkb->lkb_ref, kill_lkb);
 724        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
 725}
 726
 727static void lkb_add_ordered(struct list_head *new, struct list_head *head,
 728                            int mode)
 729{
 730        struct dlm_lkb *lkb = NULL;
 731
 732        list_for_each_entry(lkb, head, lkb_statequeue)
 733                if (lkb->lkb_rqmode < mode)
 734                        break;
 735
 736        __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 737}
 738
 739/* add/remove lkb to rsb's grant/convert/wait queue */
 740
 741static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 742{
 743        kref_get(&lkb->lkb_ref);
 744
 745        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 746
 747        lkb->lkb_timestamp = ktime_get();
 748
 749        lkb->lkb_status = status;
 750
 751        switch (status) {
 752        case DLM_LKSTS_WAITING:
 753                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 754                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 755                else
 756                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 757                break;
 758        case DLM_LKSTS_GRANTED:
 759                /* convention says granted locks kept in order of grmode */
 760                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 761                                lkb->lkb_grmode);
 762                break;
 763        case DLM_LKSTS_CONVERT:
 764                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 765                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 766                else
 767                        list_add_tail(&lkb->lkb_statequeue,
 768                                      &r->res_convertqueue);
 769                break;
 770        default:
 771                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
 772        }
 773}
 774
 775static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 776{
 777        lkb->lkb_status = 0;
 778        list_del(&lkb->lkb_statequeue);
 779        unhold_lkb(lkb);
 780}
 781
 782static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
 783{
 784        hold_lkb(lkb);
 785        del_lkb(r, lkb);
 786        add_lkb(r, lkb, sts);
 787        unhold_lkb(lkb);
 788}
 789
 790static int msg_reply_type(int mstype)
 791{
 792        switch (mstype) {
 793        case DLM_MSG_REQUEST:
 794                return DLM_MSG_REQUEST_REPLY;
 795        case DLM_MSG_CONVERT:
 796                return DLM_MSG_CONVERT_REPLY;
 797        case DLM_MSG_UNLOCK:
 798                return DLM_MSG_UNLOCK_REPLY;
 799        case DLM_MSG_CANCEL:
 800                return DLM_MSG_CANCEL_REPLY;
 801        case DLM_MSG_LOOKUP:
 802                return DLM_MSG_LOOKUP_REPLY;
 803        }
 804        return -1;
 805}
 806
 807/* add/remove lkb from global waiters list of lkb's waiting for
 808   a reply from a remote node */
 809
 810static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 811{
 812        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 813        int error = 0;
 814
 815        mutex_lock(&ls->ls_waiters_mutex);
 816
 817        if (is_overlap_unlock(lkb) ||
 818            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 819                error = -EINVAL;
 820                goto out;
 821        }
 822
 823        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
 824                switch (mstype) {
 825                case DLM_MSG_UNLOCK:
 826                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 827                        break;
 828                case DLM_MSG_CANCEL:
 829                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 830                        break;
 831                default:
 832                        error = -EBUSY;
 833                        goto out;
 834                }
 835                lkb->lkb_wait_count++;
 836                hold_lkb(lkb);
 837
 838                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
 839                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
 840                          lkb->lkb_wait_count, lkb->lkb_flags);
 841                goto out;
 842        }
 843
 844        DLM_ASSERT(!lkb->lkb_wait_count,
 845                   dlm_print_lkb(lkb);
 846                   printk("wait_count %d\n", lkb->lkb_wait_count););
 847
 848        lkb->lkb_wait_count++;
 849        lkb->lkb_wait_type = mstype;
 850        hold_lkb(lkb);
 851        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 852 out:
 853        if (error)
 854                log_error(ls, "addwait error %x %d flags %x %d %d %s",
 855                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
 856                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 857        mutex_unlock(&ls->ls_waiters_mutex);
 858        return error;
 859}
 860
 861/* We clear the RESEND flag because we might be taking an lkb off the waiters
 862   list as part of process_requestqueue (e.g. a lookup that has an optimized
 863   request reply on the requestqueue) between dlm_recover_waiters_pre() which
 864   set RESEND and dlm_recover_waiters_post() */
 865
 866static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 867                                struct dlm_message *ms)
 868{
 869        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 870        int overlap_done = 0;
 871
 872        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
 873                log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
 874                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 875                overlap_done = 1;
 876                goto out_del;
 877        }
 878
 879        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
 880                log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
 881                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 882                overlap_done = 1;
 883                goto out_del;
 884        }
 885
 886        /* Cancel state was preemptively cleared by a successful convert,
 887           see next comment, nothing to do. */
 888
 889        if ((mstype == DLM_MSG_CANCEL_REPLY) &&
 890            (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
 891                log_debug(ls, "remwait %x cancel_reply wait_type %d",
 892                          lkb->lkb_id, lkb->lkb_wait_type);
 893                return -1;
 894        }
 895
 896        /* Remove for the convert reply, and premptively remove for the
 897           cancel reply.  A convert has been granted while there's still
 898           an outstanding cancel on it (the cancel is moot and the result
 899           in the cancel reply should be 0).  We preempt the cancel reply
 900           because the app gets the convert result and then can follow up
 901           with another op, like convert.  This subsequent op would see the
 902           lingering state of the cancel and fail with -EBUSY. */
 903
 904        if ((mstype == DLM_MSG_CONVERT_REPLY) &&
 905            (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
 906            is_overlap_cancel(lkb) && ms && !ms->m_result) {
 907                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
 908                          lkb->lkb_id);
 909                lkb->lkb_wait_type = 0;
 910                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 911                lkb->lkb_wait_count--;
 912                goto out_del;
 913        }
 914
 915        /* N.B. type of reply may not always correspond to type of original
 916           msg due to lookup->request optimization, verify others? */
 917
 918        if (lkb->lkb_wait_type) {
 919                lkb->lkb_wait_type = 0;
 920                goto out_del;
 921        }
 922
 923        log_error(ls, "remwait error %x reply %d flags %x no wait_type",
 924                  lkb->lkb_id, mstype, lkb->lkb_flags);
 925        return -1;
 926
 927 out_del:
 928        /* the force-unlock/cancel has completed and we haven't recvd a reply
 929           to the op that was in progress prior to the unlock/cancel; we
 930           give up on any reply to the earlier op.  FIXME: not sure when/how
 931           this would happen */
 932
 933        if (overlap_done && lkb->lkb_wait_type) {
 934                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
 935                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
 936                lkb->lkb_wait_count--;
 937                lkb->lkb_wait_type = 0;
 938        }
 939
 940        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 941
 942        lkb->lkb_flags &= ~DLM_IFL_RESEND;
 943        lkb->lkb_wait_count--;
 944        if (!lkb->lkb_wait_count)
 945                list_del_init(&lkb->lkb_wait_reply);
 946        unhold_lkb(lkb);
 947        return 0;
 948}
 949
 950static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 951{
 952        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 953        int error;
 954
 955        mutex_lock(&ls->ls_waiters_mutex);
 956        error = _remove_from_waiters(lkb, mstype, NULL);
 957        mutex_unlock(&ls->ls_waiters_mutex);
 958        return error;
 959}
 960
 961/* Handles situations where we might be processing a "fake" or "stub" reply in
 962   which we can't try to take waiters_mutex again. */
 963
 964static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 965{
 966        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 967        int error;
 968
 969        if (ms != &ls->ls_stub_ms)
 970                mutex_lock(&ls->ls_waiters_mutex);
 971        error = _remove_from_waiters(lkb, ms->m_type, ms);
 972        if (ms != &ls->ls_stub_ms)
 973                mutex_unlock(&ls->ls_waiters_mutex);
 974        return error;
 975}
 976
 977static void dir_remove(struct dlm_rsb *r)
 978{
 979        int to_nodeid;
 980
 981        if (dlm_no_directory(r->res_ls))
 982                return;
 983
 984        to_nodeid = dlm_dir_nodeid(r);
 985        if (to_nodeid != dlm_our_nodeid())
 986                send_remove(r);
 987        else
 988                dlm_dir_remove_entry(r->res_ls, to_nodeid,
 989                                     r->res_name, r->res_length);
 990}
 991
 992/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
 993   found since they are in order of newest to oldest? */
 994
 995static int shrink_bucket(struct dlm_ls *ls, int b)
 996{
 997        struct dlm_rsb *r;
 998        int count = 0, found;
 999
1000        for (;;) {
1001                found = 0;
1002                spin_lock(&ls->ls_rsbtbl[b].lock);
1003                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                            res_hashchain) {
1005                        if (!time_after_eq(jiffies, r->res_toss_time +
1006                                           dlm_config.ci_toss_secs * HZ))
1007                                continue;
1008                        found = 1;
1009                        break;
1010                }
1011
1012                if (!found) {
1013                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                        break;
1015                }
1016
1017                if (kref_put(&r->res_ref, kill_rsb)) {
1018                        list_del(&r->res_hashchain);
1019                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                        if (is_master(r))
1022                                dir_remove(r);
1023                        dlm_free_rsb(r);
1024                        count++;
1025                } else {
1026                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                        log_error(ls, "tossed rsb in use %s", r->res_name);
1028                }
1029        }
1030
1031        return count;
1032}
1033
1034void dlm_scan_rsbs(struct dlm_ls *ls)
1035{
1036        int i;
1037
1038        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                shrink_bucket(ls, i);
1040                if (dlm_locking_stopped(ls))
1041                        break;
1042                cond_resched();
1043        }
1044}
1045
1046static void add_timeout(struct dlm_lkb *lkb)
1047{
1048        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050        if (is_master_copy(lkb))
1051                return;
1052
1053        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                goto add_it;
1057        }
1058        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                goto add_it;
1060        return;
1061
1062 add_it:
1063        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064        mutex_lock(&ls->ls_timeout_mutex);
1065        hold_lkb(lkb);
1066        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067        mutex_unlock(&ls->ls_timeout_mutex);
1068}
1069
1070static void del_timeout(struct dlm_lkb *lkb)
1071{
1072        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074        mutex_lock(&ls->ls_timeout_mutex);
1075        if (!list_empty(&lkb->lkb_time_list)) {
1076                list_del_init(&lkb->lkb_time_list);
1077                unhold_lkb(lkb);
1078        }
1079        mutex_unlock(&ls->ls_timeout_mutex);
1080}
1081
1082/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084   and then lock rsb because of lock ordering in add_timeout.  We may need
1085   to specify some special timeout-related bits in the lkb that are just to
1086   be accessed under the timeout_mutex. */
1087
1088void dlm_scan_timeout(struct dlm_ls *ls)
1089{
1090        struct dlm_rsb *r;
1091        struct dlm_lkb *lkb;
1092        int do_cancel, do_warn;
1093        s64 wait_us;
1094
1095        for (;;) {
1096                if (dlm_locking_stopped(ls))
1097                        break;
1098
1099                do_cancel = 0;
1100                do_warn = 0;
1101                mutex_lock(&ls->ls_timeout_mutex);
1102                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                        wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                        lkb->lkb_timestamp));
1106
1107                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                            wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                do_cancel = 1;
1110
1111                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                            wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                do_warn = 1;
1114
1115                        if (!do_cancel && !do_warn)
1116                                continue;
1117                        hold_lkb(lkb);
1118                        break;
1119                }
1120                mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                if (!do_cancel && !do_warn)
1123                        break;
1124
1125                r = lkb->lkb_resource;
1126                hold_rsb(r);
1127                lock_rsb(r);
1128
1129                if (do_warn) {
1130                        /* clear flag so we only warn once */
1131                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                del_timeout(lkb);
1134                        dlm_timeout_warn(lkb);
1135                }
1136
1137                if (do_cancel) {
1138                        log_debug(ls, "timeout cancel %x node %d %s",
1139                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                        del_timeout(lkb);
1143                        _cancel_lock(r, lkb);
1144                }
1145
1146                unlock_rsb(r);
1147                unhold_rsb(r);
1148                dlm_put_lkb(lkb);
1149        }
1150}
1151
1152/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153   dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155void dlm_adjust_timeouts(struct dlm_ls *ls)
1156{
1157        struct dlm_lkb *lkb;
1158        u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160        ls->ls_recover_begin = 0;
1161        mutex_lock(&ls->ls_timeout_mutex);
1162        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164        mutex_unlock(&ls->ls_timeout_mutex);
1165}
1166
1167/* lkb is master or local copy */
1168
1169static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170{
1171        int b, len = r->res_ls->ls_lvblen;
1172
1173        /* b=1 lvb returned to caller
1174           b=0 lvb written to rsb or invalidated
1175           b=-1 do nothing */
1176
1177        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179        if (b == 1) {
1180                if (!lkb->lkb_lvbptr)
1181                        return;
1182
1183                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                        return;
1185
1186                if (!r->res_lvbptr)
1187                        return;
1188
1189                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192        } else if (b == 0) {
1193                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                        rsb_set_flag(r, RSB_VALNOTVALID);
1195                        return;
1196                }
1197
1198                if (!lkb->lkb_lvbptr)
1199                        return;
1200
1201                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                        return;
1203
1204                if (!r->res_lvbptr)
1205                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                if (!r->res_lvbptr)
1208                        return;
1209
1210                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                r->res_lvbseq++;
1212                lkb->lkb_lvbseq = r->res_lvbseq;
1213                rsb_clear_flag(r, RSB_VALNOTVALID);
1214        }
1215
1216        if (rsb_flag(r, RSB_VALNOTVALID))
1217                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218}
1219
1220static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221{
1222        if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                return;
1224
1225        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                rsb_set_flag(r, RSB_VALNOTVALID);
1227                return;
1228        }
1229
1230        if (!lkb->lkb_lvbptr)
1231                return;
1232
1233        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                return;
1235
1236        if (!r->res_lvbptr)
1237                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239        if (!r->res_lvbptr)
1240                return;
1241
1242        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243        r->res_lvbseq++;
1244        rsb_clear_flag(r, RSB_VALNOTVALID);
1245}
1246
1247/* lkb is process copy (pc) */
1248
1249static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                            struct dlm_message *ms)
1251{
1252        int b;
1253
1254        if (!lkb->lkb_lvbptr)
1255                return;
1256
1257        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                return;
1259
1260        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261        if (b == 1) {
1262                int len = receive_extralen(ms);
1263                if (len > DLM_RESNAME_MAXLEN)
1264                        len = DLM_RESNAME_MAXLEN;
1265                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                lkb->lkb_lvbseq = ms->m_lvbseq;
1267        }
1268}
1269
1270/* Manipulate lkb's on rsb's convert/granted/waiting queues
1271   remove_lock -- used for unlock, removes lkb from granted
1272   revert_lock -- used for cancel, moves lkb from convert to granted
1273   grant_lock  -- used for request and convert, adds lkb to granted or
1274                  moves lkb from convert or waiting to granted
1275
1276   Each of these is used for master or local copy lkb's.  There is
1277   also a _pc() variation used to make the corresponding change on
1278   a process copy (pc) lkb. */
1279
1280static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281{
1282        del_lkb(r, lkb);
1283        lkb->lkb_grmode = DLM_LOCK_IV;
1284        /* this unhold undoes the original ref from create_lkb()
1285           so this leads to the lkb being freed */
1286        unhold_lkb(lkb);
1287}
1288
1289static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290{
1291        set_lvb_unlock(r, lkb);
1292        _remove_lock(r, lkb);
1293}
1294
1295static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296{
1297        _remove_lock(r, lkb);
1298}
1299
1300/* returns: 0 did nothing
1301            1 moved lock to granted
1302           -1 removed lock */
1303
1304static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305{
1306        int rv = 0;
1307
1308        lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310        switch (lkb->lkb_status) {
1311        case DLM_LKSTS_GRANTED:
1312                break;
1313        case DLM_LKSTS_CONVERT:
1314                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                rv = 1;
1316                break;
1317        case DLM_LKSTS_WAITING:
1318                del_lkb(r, lkb);
1319                lkb->lkb_grmode = DLM_LOCK_IV;
1320                /* this unhold undoes the original ref from create_lkb()
1321                   so this leads to the lkb being freed */
1322                unhold_lkb(lkb);
1323                rv = -1;
1324                break;
1325        default:
1326                log_print("invalid status for revert %d", lkb->lkb_status);
1327        }
1328        return rv;
1329}
1330
1331static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332{
1333        return revert_lock(r, lkb);
1334}
1335
1336static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337{
1338        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                lkb->lkb_grmode = lkb->lkb_rqmode;
1340                if (lkb->lkb_status)
1341                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                else
1343                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344        }
1345
1346        lkb->lkb_rqmode = DLM_LOCK_IV;
1347}
1348
1349static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350{
1351        set_lvb_lock(r, lkb);
1352        _grant_lock(r, lkb);
1353        lkb->lkb_highbast = 0;
1354}
1355
1356static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                          struct dlm_message *ms)
1358{
1359        set_lvb_lock_pc(r, lkb, ms);
1360        _grant_lock(r, lkb);
1361}
1362
1363/* called by grant_pending_locks() which means an async grant message must
1364   be sent to the requesting node in addition to granting the lock if the
1365   lkb belongs to a remote node. */
1366
1367static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368{
1369        grant_lock(r, lkb);
1370        if (is_master_copy(lkb))
1371                send_grant(r, lkb);
1372        else
1373                queue_cast(r, lkb, 0);
1374}
1375
1376/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377   change the granted/requested modes.  We're munging things accordingly in
1378   the process copy.
1379   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380   conversion deadlock
1381   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382   compatible with other granted locks */
1383
1384static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385{
1386        if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                log_print("munge_demoted %x invalid reply type %d",
1388                          lkb->lkb_id, ms->m_type);
1389                return;
1390        }
1391
1392        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                return;
1396        }
1397
1398        lkb->lkb_grmode = DLM_LOCK_NL;
1399}
1400
1401static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402{
1403        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404            ms->m_type != DLM_MSG_GRANT) {
1405                log_print("munge_altmode %x invalid reply type %d",
1406                          lkb->lkb_id, ms->m_type);
1407                return;
1408        }
1409
1410        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                lkb->lkb_rqmode = DLM_LOCK_PR;
1412        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                lkb->lkb_rqmode = DLM_LOCK_CW;
1414        else {
1415                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                dlm_print_lkb(lkb);
1417        }
1418}
1419
1420static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421{
1422        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                           lkb_statequeue);
1424        if (lkb->lkb_id == first->lkb_id)
1425                return 1;
1426
1427        return 0;
1428}
1429
1430/* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433{
1434        struct dlm_lkb *this;
1435
1436        list_for_each_entry(this, head, lkb_statequeue) {
1437                if (this == lkb)
1438                        continue;
1439                if (!modes_compat(this, lkb))
1440                        return 1;
1441        }
1442        return 0;
1443}
1444
1445/*
1446 * "A conversion deadlock arises with a pair of lock requests in the converting
1447 * queue for one resource.  The granted mode of each lock blocks the requested
1448 * mode of the other lock."
1449 *
1450 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451 * convert queue from being granted, then deadlk/demote lkb.
1452 *
1453 * Example:
1454 * Granted Queue: empty
1455 * Convert Queue: NL->EX (first lock)
1456 *                PR->EX (second lock)
1457 *
1458 * The first lock can't be granted because of the granted mode of the second
1459 * lock and the second lock can't be granted because it's not first in the
1460 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462 * flag set and return DEMOTED in the lksb flags.
1463 *
1464 * Originally, this function detected conv-deadlk in a more limited scope:
1465 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466 * - if lkb1 was the first entry in the queue (not just earlier), and was
1467 *   blocked by the granted mode of lkb2, and there was nothing on the
1468 *   granted queue preventing lkb1 from being granted immediately, i.e.
1469 *   lkb2 was the only thing preventing lkb1 from being granted.
1470 *
1471 * That second condition meant we'd only say there was conv-deadlk if
1472 * resolving it (by demotion) would lead to the first lock on the convert
1473 * queue being granted right away.  It allowed conversion deadlocks to exist
1474 * between locks on the convert queue while they couldn't be granted anyway.
1475 *
1476 * Now, we detect and take action on conversion deadlocks immediately when
1477 * they're created, even if they may not be immediately consequential.  If
1478 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479 * mode that would prevent lkb1's conversion from being granted, we do a
1480 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481 * I think this means that the lkb_is_ahead condition below should always
1482 * be zero, i.e. there will never be conv-deadlk between two locks that are
1483 * both already on the convert queue.
1484 */
1485
1486static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487{
1488        struct dlm_lkb *lkb1;
1489        int lkb_is_ahead = 0;
1490
1491        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                if (lkb1 == lkb2) {
1493                        lkb_is_ahead = 1;
1494                        continue;
1495                }
1496
1497                if (!lkb_is_ahead) {
1498                        if (!modes_compat(lkb2, lkb1))
1499                                return 1;
1500                } else {
1501                        if (!modes_compat(lkb2, lkb1) &&
1502                            !modes_compat(lkb1, lkb2))
1503                                return 1;
1504                }
1505        }
1506        return 0;
1507}
1508
1509/*
1510 * Return 1 if the lock can be granted, 0 otherwise.
1511 * Also detect and resolve conversion deadlocks.
1512 *
1513 * lkb is the lock to be granted
1514 *
1515 * now is 1 if the function is being called in the context of the
1516 * immediate request, it is 0 if called later, after the lock has been
1517 * queued.
1518 *
1519 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520 */
1521
1522static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523{
1524        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526        /*
1527         * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528         * a new request for a NL mode lock being blocked.
1529         *
1530         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531         * request, then it would be granted.  In essence, the use of this flag
1532         * tells the Lock Manager to expedite theis request by not considering
1533         * what may be in the CONVERTING or WAITING queues...  As of this
1534         * writing, the EXPEDITE flag can be used only with new requests for NL
1535         * mode locks.  This flag is not valid for conversion requests.
1536         *
1537         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538         * conversion or used with a non-NL requested mode.  We also know an
1539         * EXPEDITE request is always granted immediately, so now must always
1540         * be 1.  The full condition to grant an expedite request: (now &&
1541         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542         * therefore be shortened to just checking the flag.
1543         */
1544
1545        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                return 1;
1547
1548        /*
1549         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550         * added to the remaining conditions.
1551         */
1552
1553        if (queue_conflict(&r->res_grantqueue, lkb))
1554                goto out;
1555
1556        /*
1557         * 6-3: By default, a conversion request is immediately granted if the
1558         * requested mode is compatible with the modes of all other granted
1559         * locks
1560         */
1561
1562        if (queue_conflict(&r->res_convertqueue, lkb))
1563                goto out;
1564
1565        /*
1566         * 6-5: But the default algorithm for deciding whether to grant or
1567         * queue conversion requests does not by itself guarantee that such
1568         * requests are serviced on a "first come first serve" basis.  This, in
1569         * turn, can lead to a phenomenon known as "indefinate postponement".
1570         *
1571         * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572         * the system service employed to request a lock conversion.  This flag
1573         * forces certain conversion requests to be queued, even if they are
1574         * compatible with the granted modes of other locks on the same
1575         * resource.  Thus, the use of this flag results in conversion requests
1576         * being ordered on a "first come first servce" basis.
1577         *
1578         * DCT: This condition is all about new conversions being able to occur
1579         * "in place" while the lock remains on the granted queue (assuming
1580         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581         * doesn't _have_ to go onto the convert queue where it's processed in
1582         * order.  The "now" variable is necessary to distinguish converts
1583         * being received and processed for the first time now, because once a
1584         * convert is moved to the conversion queue the condition below applies
1585         * requiring fifo granting.
1586         */
1587
1588        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                return 1;
1590
1591        /*
1592         * The NOORDER flag is set to avoid the standard vms rules on grant
1593         * order.
1594         */
1595
1596        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                return 1;
1598
1599        /*
1600         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601         * granted until all other conversion requests ahead of it are granted
1602         * and/or canceled.
1603         */
1604
1605        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                return 1;
1607
1608        /*
1609         * 6-4: By default, a new request is immediately granted only if all
1610         * three of the following conditions are satisfied when the request is
1611         * issued:
1612         * - The queue of ungranted conversion requests for the resource is
1613         *   empty.
1614         * - The queue of ungranted new requests for the resource is empty.
1615         * - The mode of the new request is compatible with the most
1616         *   restrictive mode of all granted locks on the resource.
1617         */
1618
1619        if (now && !conv && list_empty(&r->res_convertqueue) &&
1620            list_empty(&r->res_waitqueue))
1621                return 1;
1622
1623        /*
1624         * 6-4: Once a lock request is in the queue of ungranted new requests,
1625         * it cannot be granted until the queue of ungranted conversion
1626         * requests is empty, all ungranted new requests ahead of it are
1627         * granted and/or canceled, and it is compatible with the granted mode
1628         * of the most restrictive lock granted on the resource.
1629         */
1630
1631        if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632            first_in_list(lkb, &r->res_waitqueue))
1633                return 1;
1634 out:
1635        return 0;
1636}
1637
1638static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                          int *err)
1640{
1641        int rv;
1642        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645        if (err)
1646                *err = 0;
1647
1648        rv = _can_be_granted(r, lkb, now);
1649        if (rv)
1650                goto out;
1651
1652        /*
1653         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655         * cancels one of the locks.
1656         */
1657
1658        if (is_convert && can_be_queued(lkb) &&
1659            conversion_deadlock_detect(r, lkb)) {
1660                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                        lkb->lkb_grmode = DLM_LOCK_NL;
1662                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                        if (err)
1665                                *err = -EDEADLK;
1666                        else {
1667                                log_print("can_be_granted deadlock %x now %d",
1668                                          lkb->lkb_id, now);
1669                                dlm_dump_rsb(r);
1670                        }
1671                }
1672                goto out;
1673        }
1674
1675        /*
1676         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677         * to grant a request in a mode other than the normal rqmode.  It's a
1678         * simple way to provide a big optimization to applications that can
1679         * use them.
1680         */
1681
1682        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                alt = DLM_LOCK_PR;
1684        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                alt = DLM_LOCK_CW;
1686
1687        if (alt) {
1688                lkb->lkb_rqmode = alt;
1689                rv = _can_be_granted(r, lkb, now);
1690                if (rv)
1691                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                else
1693                        lkb->lkb_rqmode = rqmode;
1694        }
1695 out:
1696        return rv;
1697}
1698
1699/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700   for locks pending on the convert list.  Once verified (watch for these
1701   log_prints), we should be able to just call _can_be_granted() and not
1702   bother with the demote/deadlk cases here (and there's no easy way to deal
1703   with a deadlk here, we'd have to generate something like grant_lock with
1704   the deadlk error.) */
1705
1706/* Returns the highest requested mode of all blocked conversions; sets
1707   cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710{
1711        struct dlm_lkb *lkb, *s;
1712        int hi, demoted, quit, grant_restart, demote_restart;
1713        int deadlk;
1714
1715        quit = 0;
1716 restart:
1717        grant_restart = 0;
1718        demote_restart = 0;
1719        hi = DLM_LOCK_IV;
1720
1721        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                demoted = is_demoted(lkb);
1723                deadlk = 0;
1724
1725                if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                        grant_lock_pending(r, lkb);
1727                        grant_restart = 1;
1728                        continue;
1729                }
1730
1731                if (!demoted && is_demoted(lkb)) {
1732                        log_print("WARN: pending demoted %x node %d %s",
1733                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                        demote_restart = 1;
1735                        continue;
1736                }
1737
1738                if (deadlk) {
1739                        log_print("WARN: pending deadlock %x node %d %s",
1740                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                        dlm_dump_rsb(r);
1742                        continue;
1743                }
1744
1745                hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                        *cw = 1;
1749        }
1750
1751        if (grant_restart)
1752                goto restart;
1753        if (demote_restart && !quit) {
1754                quit = 1;
1755                goto restart;
1756        }
1757
1758        return max_t(int, high, hi);
1759}
1760
1761static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762{
1763        struct dlm_lkb *lkb, *s;
1764
1765        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                if (can_be_granted(r, lkb, 0, NULL))
1767                        grant_lock_pending(r, lkb);
1768                else {
1769                        high = max_t(int, lkb->lkb_rqmode, high);
1770                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                *cw = 1;
1772                }
1773        }
1774
1775        return high;
1776}
1777
1778/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779   on either the convert or waiting queue.
1780   high is the largest rqmode of all locks blocked on the convert or
1781   waiting queue. */
1782
1783static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784{
1785        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                if (gr->lkb_highbast < DLM_LOCK_EX)
1787                        return 1;
1788                return 0;
1789        }
1790
1791        if (gr->lkb_highbast < high &&
1792            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                return 1;
1794        return 0;
1795}
1796
1797static void grant_pending_locks(struct dlm_rsb *r)
1798{
1799        struct dlm_lkb *lkb, *s;
1800        int high = DLM_LOCK_IV;
1801        int cw = 0;
1802
1803        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805        high = grant_pending_convert(r, high, &cw);
1806        high = grant_pending_wait(r, high, &cw);
1807
1808        if (high == DLM_LOCK_IV)
1809                return;
1810
1811        /*
1812         * If there are locks left on the wait/convert queue then send blocking
1813         * ASTs to granted locks based on the largest requested mode (high)
1814         * found above.
1815         */
1816
1817        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                        if (cw && high == DLM_LOCK_PR &&
1820                            lkb->lkb_grmode == DLM_LOCK_PR)
1821                                queue_bast(r, lkb, DLM_LOCK_CW);
1822                        else
1823                                queue_bast(r, lkb, high);
1824                        lkb->lkb_highbast = high;
1825                }
1826        }
1827}
1828
1829static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830{
1831        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                if (gr->lkb_highbast < DLM_LOCK_EX)
1834                        return 1;
1835                return 0;
1836        }
1837
1838        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                return 1;
1840        return 0;
1841}
1842
1843static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                            struct dlm_lkb *lkb)
1845{
1846        struct dlm_lkb *gr;
1847
1848        list_for_each_entry(gr, head, lkb_statequeue) {
1849                /* skip self when sending basts to convertqueue */
1850                if (gr == lkb)
1851                        continue;
1852                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1853                        queue_bast(r, gr, lkb->lkb_rqmode);
1854                        gr->lkb_highbast = lkb->lkb_rqmode;
1855                }
1856        }
1857}
1858
1859static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1860{
1861        send_bast_queue(r, &r->res_grantqueue, lkb);
1862}
1863
1864static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1865{
1866        send_bast_queue(r, &r->res_grantqueue, lkb);
1867        send_bast_queue(r, &r->res_convertqueue, lkb);
1868}
1869
1870/* set_master(r, lkb) -- set the master nodeid of a resource
1871
1872   The purpose of this function is to set the nodeid field in the given
1873   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1874   known, it can just be copied to the lkb and the function will return
1875   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1876   before it can be copied to the lkb.
1877
1878   When the rsb nodeid is being looked up remotely, the initial lkb
1879   causing the lookup is kept on the ls_waiters list waiting for the
1880   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1881   on the rsb's res_lookup list until the master is verified.
1882
1883   Return values:
1884   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1885   1: the rsb master is not available and the lkb has been placed on
1886      a wait queue
1887*/
1888
1889static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890{
1891        struct dlm_ls *ls = r->res_ls;
1892        int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1893
1894        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1895                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1896                r->res_first_lkid = lkb->lkb_id;
1897                lkb->lkb_nodeid = r->res_nodeid;
1898                return 0;
1899        }
1900
1901        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1902                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1903                return 1;
1904        }
1905
1906        if (r->res_nodeid == 0) {
1907                lkb->lkb_nodeid = 0;
1908                return 0;
1909        }
1910
1911        if (r->res_nodeid > 0) {
1912                lkb->lkb_nodeid = r->res_nodeid;
1913                return 0;
1914        }
1915
1916        DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1917
1918        dir_nodeid = dlm_dir_nodeid(r);
1919
1920        if (dir_nodeid != our_nodeid) {
1921                r->res_first_lkid = lkb->lkb_id;
1922                send_lookup(r, lkb);
1923                return 1;
1924        }
1925
1926        for (i = 0; i < 2; i++) {
1927                /* It's possible for dlm_scand to remove an old rsb for
1928                   this same resource from the toss list, us to create
1929                   a new one, look up the master locally, and find it
1930                   already exists just before dlm_scand does the
1931                   dir_remove() on the previous rsb. */
1932
1933                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1934                                       r->res_length, &ret_nodeid);
1935                if (!error)
1936                        break;
1937                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1938                schedule();
1939        }
1940        if (error && error != -EEXIST)
1941                return error;
1942
1943        if (ret_nodeid == our_nodeid) {
1944                r->res_first_lkid = 0;
1945                r->res_nodeid = 0;
1946                lkb->lkb_nodeid = 0;
1947        } else {
1948                r->res_first_lkid = lkb->lkb_id;
1949                r->res_nodeid = ret_nodeid;
1950                lkb->lkb_nodeid = ret_nodeid;
1951        }
1952        return 0;
1953}
1954
1955static void process_lookup_list(struct dlm_rsb *r)
1956{
1957        struct dlm_lkb *lkb, *safe;
1958
1959        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1960                list_del_init(&lkb->lkb_rsb_lookup);
1961                _request_lock(r, lkb);
1962                schedule();
1963        }
1964}
1965
1966/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1967
1968static void confirm_master(struct dlm_rsb *r, int error)
1969{
1970        struct dlm_lkb *lkb;
1971
1972        if (!r->res_first_lkid)
1973                return;
1974
1975        switch (error) {
1976        case 0:
1977        case -EINPROGRESS:
1978                r->res_first_lkid = 0;
1979                process_lookup_list(r);
1980                break;
1981
1982        case -EAGAIN:
1983        case -EBADR:
1984        case -ENOTBLK:
1985                /* the remote request failed and won't be retried (it was
1986                   a NOQUEUE, or has been canceled/unlocked); make a waiting
1987                   lkb the first_lkid */
1988
1989                r->res_first_lkid = 0;
1990
1991                if (!list_empty(&r->res_lookup)) {
1992                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1993                                         lkb_rsb_lookup);
1994                        list_del_init(&lkb->lkb_rsb_lookup);
1995                        r->res_first_lkid = lkb->lkb_id;
1996                        _request_lock(r, lkb);
1997                }
1998                break;
1999
2000        default:
2001                log_error(r->res_ls, "confirm_master unknown error %d", error);
2002        }
2003}
2004
2005static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2006                         int namelen, unsigned long timeout_cs,
2007                         void (*ast) (void *astparam),
2008                         void *astparam,
2009                         void (*bast) (void *astparam, int mode),
2010                         struct dlm_args *args)
2011{
2012        int rv = -EINVAL;
2013
2014        /* check for invalid arg usage */
2015
2016        if (mode < 0 || mode > DLM_LOCK_EX)
2017                goto out;
2018
2019        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2020                goto out;
2021
2022        if (flags & DLM_LKF_CANCEL)
2023                goto out;
2024
2025        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2026                goto out;
2027
2028        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2029                goto out;
2030
2031        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2032                goto out;
2033
2034        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2035                goto out;
2036
2037        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2038                goto out;
2039
2040        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2041                goto out;
2042
2043        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2044                goto out;
2045
2046        if (!ast || !lksb)
2047                goto out;
2048
2049        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2050                goto out;
2051
2052        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2053                goto out;
2054
2055        /* these args will be copied to the lkb in validate_lock_args,
2056           it cannot be done now because when converting locks, fields in
2057           an active lkb cannot be modified before locking the rsb */
2058
2059        args->flags = flags;
2060        args->astfn = ast;
2061        args->astparam = astparam;
2062        args->bastfn = bast;
2063        args->timeout = timeout_cs;
2064        args->mode = mode;
2065        args->lksb = lksb;
2066        rv = 0;
2067 out:
2068        return rv;
2069}
2070
2071static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2072{
2073        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2074                      DLM_LKF_FORCEUNLOCK))
2075                return -EINVAL;
2076
2077        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2078                return -EINVAL;
2079
2080        args->flags = flags;
2081        args->astparam = astarg;
2082        return 0;
2083}
2084
2085static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2086                              struct dlm_args *args)
2087{
2088        int rv = -EINVAL;
2089
2090        if (args->flags & DLM_LKF_CONVERT) {
2091                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2092                        goto out;
2093
2094                if (args->flags & DLM_LKF_QUECVT &&
2095                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2096                        goto out;
2097
2098                rv = -EBUSY;
2099                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2100                        goto out;
2101
2102                if (lkb->lkb_wait_type)
2103                        goto out;
2104
2105                if (is_overlap(lkb))
2106                        goto out;
2107        }
2108
2109        lkb->lkb_exflags = args->flags;
2110        lkb->lkb_sbflags = 0;
2111        lkb->lkb_astfn = args->astfn;
2112        lkb->lkb_astparam = args->astparam;
2113        lkb->lkb_bastfn = args->bastfn;
2114        lkb->lkb_rqmode = args->mode;
2115        lkb->lkb_lksb = args->lksb;
2116        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2117        lkb->lkb_ownpid = (int) current->pid;
2118        lkb->lkb_timeout_cs = args->timeout;
2119        rv = 0;
2120 out:
2121        if (rv)
2122                log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2123                          rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2124                          lkb->lkb_status, lkb->lkb_wait_type,
2125                          lkb->lkb_resource->res_name);
2126        return rv;
2127}
2128
2129/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2130   for success */
2131
2132/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2133   because there may be a lookup in progress and it's valid to do
2134   cancel/unlockf on it */
2135
2136static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2137{
2138        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2139        int rv = -EINVAL;
2140
2141        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2142                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2143                dlm_print_lkb(lkb);
2144                goto out;
2145        }
2146
2147        /* an lkb may still exist even though the lock is EOL'ed due to a
2148           cancel, unlock or failed noqueue request; an app can't use these
2149           locks; return same error as if the lkid had not been found at all */
2150
2151        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2152                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2153                rv = -ENOENT;
2154                goto out;
2155        }
2156
2157        /* an lkb may be waiting for an rsb lookup to complete where the
2158           lookup was initiated by another lock */
2159
2160        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2161                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2162                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2163                        list_del_init(&lkb->lkb_rsb_lookup);
2164                        queue_cast(lkb->lkb_resource, lkb,
2165                                   args->flags & DLM_LKF_CANCEL ?
2166                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2167                        unhold_lkb(lkb); /* undoes create_lkb() */
2168                }
2169                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2170                rv = -EBUSY;
2171                goto out;
2172        }
2173
2174        /* cancel not allowed with another cancel/unlock in progress */
2175
2176        if (args->flags & DLM_LKF_CANCEL) {
2177                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2178                        goto out;
2179
2180                if (is_overlap(lkb))
2181                        goto out;
2182
2183                /* don't let scand try to do a cancel */
2184                del_timeout(lkb);
2185
2186                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2187                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2188                        rv = -EBUSY;
2189                        goto out;
2190                }
2191
2192                /* there's nothing to cancel */
2193                if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2194                    !lkb->lkb_wait_type) {
2195                        rv = -EBUSY;
2196                        goto out;
2197                }
2198
2199                switch (lkb->lkb_wait_type) {
2200                case DLM_MSG_LOOKUP:
2201                case DLM_MSG_REQUEST:
2202                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2203                        rv = -EBUSY;
2204                        goto out;
2205                case DLM_MSG_UNLOCK:
2206                case DLM_MSG_CANCEL:
2207                        goto out;
2208                }
2209                /* add_to_waiters() will set OVERLAP_CANCEL */
2210                goto out_ok;
2211        }
2212
2213        /* do we need to allow a force-unlock if there's a normal unlock
2214           already in progress?  in what conditions could the normal unlock
2215           fail such that we'd want to send a force-unlock to be sure? */
2216
2217        if (args->flags & DLM_LKF_FORCEUNLOCK) {
2218                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2219                        goto out;
2220
2221                if (is_overlap_unlock(lkb))
2222                        goto out;
2223
2224                /* don't let scand try to do a cancel */
2225                del_timeout(lkb);
2226
2227                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2228                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2229                        rv = -EBUSY;
2230                        goto out;
2231                }
2232
2233                switch (lkb->lkb_wait_type) {
2234                case DLM_MSG_LOOKUP:
2235                case DLM_MSG_REQUEST:
2236                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2237                        rv = -EBUSY;
2238                        goto out;
2239                case DLM_MSG_UNLOCK:
2240                        goto out;
2241                }
2242                /* add_to_waiters() will set OVERLAP_UNLOCK */
2243                goto out_ok;
2244        }
2245
2246        /* normal unlock not allowed if there's any op in progress */
2247        rv = -EBUSY;
2248        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2249                goto out;
2250
2251 out_ok:
2252        /* an overlapping op shouldn't blow away exflags from other op */
2253        lkb->lkb_exflags |= args->flags;
2254        lkb->lkb_sbflags = 0;
2255        lkb->lkb_astparam = args->astparam;
2256        rv = 0;
2257 out:
2258        if (rv)
2259                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2260                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2261                          args->flags, lkb->lkb_wait_type,
2262                          lkb->lkb_resource->res_name);
2263        return rv;
2264}
2265
2266/*
2267 * Four stage 4 varieties:
2268 * do_request(), do_convert(), do_unlock(), do_cancel()
2269 * These are called on the master node for the given lock and
2270 * from the central locking logic.
2271 */
2272
2273static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2274{
2275        int error = 0;
2276
2277        if (can_be_granted(r, lkb, 1, NULL)) {
2278                grant_lock(r, lkb);
2279                queue_cast(r, lkb, 0);
2280                goto out;
2281        }
2282
2283        if (can_be_queued(lkb)) {
2284                error = -EINPROGRESS;
2285                add_lkb(r, lkb, DLM_LKSTS_WAITING);
2286                add_timeout(lkb);
2287                goto out;
2288        }
2289
2290        error = -EAGAIN;
2291        queue_cast(r, lkb, -EAGAIN);
2292 out:
2293        return error;
2294}
2295
2296static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2297                               int error)
2298{
2299        switch (error) {
2300        case -EAGAIN:
2301                if (force_blocking_asts(lkb))
2302                        send_blocking_asts_all(r, lkb);
2303                break;
2304        case -EINPROGRESS:
2305                send_blocking_asts(r, lkb);
2306                break;
2307        }
2308}
2309
2310static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2311{
2312        int error = 0;
2313        int deadlk = 0;
2314
2315        /* changing an existing lock may allow others to be granted */
2316
2317        if (can_be_granted(r, lkb, 1, &deadlk)) {
2318                grant_lock(r, lkb);
2319                queue_cast(r, lkb, 0);
2320                goto out;
2321        }
2322
2323        /* can_be_granted() detected that this lock would block in a conversion
2324           deadlock, so we leave it on the granted queue and return EDEADLK in
2325           the ast for the convert. */
2326
2327        if (deadlk) {
2328                /* it's left on the granted queue */
2329                log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2330                          lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2331                          lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2332                revert_lock(r, lkb);
2333                queue_cast(r, lkb, -EDEADLK);
2334                error = -EDEADLK;
2335                goto out;
2336        }
2337
2338        /* is_demoted() means the can_be_granted() above set the grmode
2339           to NL, and left us on the granted queue.  This auto-demotion
2340           (due to CONVDEADLK) might mean other locks, and/or this lock, are
2341           now grantable.  We have to try to grant other converting locks
2342           before we try again to grant this one. */
2343
2344        if (is_demoted(lkb)) {
2345                grant_pending_convert(r, DLM_LOCK_IV, NULL);
2346                if (_can_be_granted(r, lkb, 1)) {
2347                        grant_lock(r, lkb);
2348                        queue_cast(r, lkb, 0);
2349                        goto out;
2350                }
2351                /* else fall through and move to convert queue */
2352        }
2353
2354        if (can_be_queued(lkb)) {
2355                error = -EINPROGRESS;
2356                del_lkb(r, lkb);
2357                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2358                add_timeout(lkb);
2359                goto out;
2360        }
2361
2362        error = -EAGAIN;
2363        queue_cast(r, lkb, -EAGAIN);
2364 out:
2365        return error;
2366}
2367
2368static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2369                               int error)
2370{
2371        switch (error) {
2372        case 0:
2373                grant_pending_locks(r);
2374                /* grant_pending_locks also sends basts */
2375                break;
2376        case -EAGAIN:
2377                if (force_blocking_asts(lkb))
2378                        send_blocking_asts_all(r, lkb);
2379                break;
2380        case -EINPROGRESS:
2381                send_blocking_asts(r, lkb);
2382                break;
2383        }
2384}
2385
2386static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2387{
2388        remove_lock(r, lkb);
2389        queue_cast(r, lkb, -DLM_EUNLOCK);
2390        return -DLM_EUNLOCK;
2391}
2392
2393static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394                              int error)
2395{
2396        grant_pending_locks(r);
2397}
2398
2399/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2400 
2401static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2402{
2403        int error;
2404
2405        error = revert_lock(r, lkb);
2406        if (error) {
2407                queue_cast(r, lkb, -DLM_ECANCEL);
2408                return -DLM_ECANCEL;
2409        }
2410        return 0;
2411}
2412
2413static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2414                              int error)
2415{
2416        if (error)
2417                grant_pending_locks(r);
2418}
2419
2420/*
2421 * Four stage 3 varieties:
2422 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2423 */
2424
2425/* add a new lkb to a possibly new rsb, called by requesting process */
2426
2427static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2428{
2429        int error;
2430
2431        /* set_master: sets lkb nodeid from r */
2432
2433        error = set_master(r, lkb);
2434        if (error < 0)
2435                goto out;
2436        if (error) {
2437                error = 0;
2438                goto out;
2439        }
2440
2441        if (is_remote(r)) {
2442                /* receive_request() calls do_request() on remote node */
2443                error = send_request(r, lkb);
2444        } else {
2445                error = do_request(r, lkb);
2446                /* for remote locks the request_reply is sent
2447                   between do_request and do_request_effects */
2448                do_request_effects(r, lkb, error);
2449        }
2450 out:
2451        return error;
2452}
2453
2454/* change some property of an existing lkb, e.g. mode */
2455
2456static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2457{
2458        int error;
2459
2460        if (is_remote(r)) {
2461                /* receive_convert() calls do_convert() on remote node */
2462                error = send_convert(r, lkb);
2463        } else {
2464                error = do_convert(r, lkb);
2465                /* for remote locks the convert_reply is sent
2466                   between do_convert and do_convert_effects */
2467                do_convert_effects(r, lkb, error);
2468        }
2469
2470        return error;
2471}
2472
2473/* remove an existing lkb from the granted queue */
2474
2475static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2476{
2477        int error;
2478
2479        if (is_remote(r)) {
2480                /* receive_unlock() calls do_unlock() on remote node */
2481                error = send_unlock(r, lkb);
2482        } else {
2483                error = do_unlock(r, lkb);
2484                /* for remote locks the unlock_reply is sent
2485                   between do_unlock and do_unlock_effects */
2486                do_unlock_effects(r, lkb, error);
2487        }
2488
2489        return error;
2490}
2491
2492/* remove an existing lkb from the convert or wait queue */
2493
2494static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2495{
2496        int error;
2497
2498        if (is_remote(r)) {
2499                /* receive_cancel() calls do_cancel() on remote node */
2500                error = send_cancel(r, lkb);
2501        } else {
2502                error = do_cancel(r, lkb);
2503                /* for remote locks the cancel_reply is sent
2504                   between do_cancel and do_cancel_effects */
2505                do_cancel_effects(r, lkb, error);
2506        }
2507
2508        return error;
2509}
2510
2511/*
2512 * Four stage 2 varieties:
2513 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2514 */
2515
2516static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2517                        int len, struct dlm_args *args)
2518{
2519        struct dlm_rsb *r;
2520        int error;
2521
2522        error = validate_lock_args(ls, lkb, args);
2523        if (error)
2524                goto out;
2525
2526        error = find_rsb(ls, name, len, R_CREATE, &r);
2527        if (error)
2528                goto out;
2529
2530        lock_rsb(r);
2531
2532        attach_lkb(r, lkb);
2533        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2534
2535        error = _request_lock(r, lkb);
2536
2537        unlock_rsb(r);
2538        put_rsb(r);
2539
2540 out:
2541        return error;
2542}
2543
2544static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2545                        struct dlm_args *args)
2546{
2547        struct dlm_rsb *r;
2548        int error;
2549
2550        r = lkb->lkb_resource;
2551
2552        hold_rsb(r);
2553        lock_rsb(r);
2554
2555        error = validate_lock_args(ls, lkb, args);
2556        if (error)
2557                goto out;
2558
2559        error = _convert_lock(r, lkb);
2560 out:
2561        unlock_rsb(r);
2562        put_rsb(r);
2563        return error;
2564}
2565
2566static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2567                       struct dlm_args *args)
2568{
2569        struct dlm_rsb *r;
2570        int error;
2571
2572        r = lkb->lkb_resource;
2573
2574        hold_rsb(r);
2575        lock_rsb(r);
2576
2577        error = validate_unlock_args(lkb, args);
2578        if (error)
2579                goto out;
2580
2581        error = _unlock_lock(r, lkb);
2582 out:
2583        unlock_rsb(r);
2584        put_rsb(r);
2585        return error;
2586}
2587
2588static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2589                       struct dlm_args *args)
2590{
2591        struct dlm_rsb *r;
2592        int error;
2593
2594        r = lkb->lkb_resource;
2595
2596        hold_rsb(r);
2597        lock_rsb(r);
2598
2599        error = validate_unlock_args(lkb, args);
2600        if (error)
2601                goto out;
2602
2603        error = _cancel_lock(r, lkb);
2604 out:
2605        unlock_rsb(r);
2606        put_rsb(r);
2607        return error;
2608}
2609
2610/*
2611 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2612 */
2613
2614int dlm_lock(dlm_lockspace_t *lockspace,
2615             int mode,
2616             struct dlm_lksb *lksb,
2617             uint32_t flags,
2618             void *name,
2619             unsigned int namelen,
2620             uint32_t parent_lkid,
2621             void (*ast) (void *astarg),
2622             void *astarg,
2623             void (*bast) (void *astarg, int mode))
2624{
2625        struct dlm_ls *ls;
2626        struct dlm_lkb *lkb;
2627        struct dlm_args args;
2628        int error, convert = flags & DLM_LKF_CONVERT;
2629
2630        ls = dlm_find_lockspace_local(lockspace);
2631        if (!ls)
2632                return -EINVAL;
2633
2634        dlm_lock_recovery(ls);
2635
2636        if (convert)
2637                error = find_lkb(ls, lksb->sb_lkid, &lkb);
2638        else
2639                error = create_lkb(ls, &lkb);
2640
2641        if (error)
2642                goto out;
2643
2644        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2645                              astarg, bast, &args);
2646        if (error)
2647                goto out_put;
2648
2649        if (convert)
2650                error = convert_lock(ls, lkb, &args);
2651        else
2652                error = request_lock(ls, lkb, name, namelen, &args);
2653
2654        if (error == -EINPROGRESS)
2655                error = 0;
2656 out_put:
2657        if (convert || error)
2658                __put_lkb(ls, lkb);
2659        if (error == -EAGAIN || error == -EDEADLK)
2660                error = 0;
2661 out:
2662        dlm_unlock_recovery(ls);
2663        dlm_put_lockspace(ls);
2664        return error;
2665}
2666
2667int dlm_unlock(dlm_lockspace_t *lockspace,
2668               uint32_t lkid,
2669               uint32_t flags,
2670               struct dlm_lksb *lksb,
2671               void *astarg)
2672{
2673        struct dlm_ls *ls;
2674        struct dlm_lkb *lkb;
2675        struct dlm_args args;
2676        int error;
2677
2678        ls = dlm_find_lockspace_local(lockspace);
2679        if (!ls)
2680                return -EINVAL;
2681
2682        dlm_lock_recovery(ls);
2683
2684        error = find_lkb(ls, lkid, &lkb);
2685        if (error)
2686                goto out;
2687
2688        error = set_unlock_args(flags, astarg, &args);
2689        if (error)
2690                goto out_put;
2691
2692        if (flags & DLM_LKF_CANCEL)
2693                error = cancel_lock(ls, lkb, &args);
2694        else
2695                error = unlock_lock(ls, lkb, &args);
2696
2697        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2698                error = 0;
2699        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2700                error = 0;
2701 out_put:
2702        dlm_put_lkb(lkb);
2703 out:
2704        dlm_unlock_recovery(ls);
2705        dlm_put_lockspace(ls);
2706        return error;
2707}
2708
2709/*
2710 * send/receive routines for remote operations and replies
2711 *
2712 * send_args
2713 * send_common
2714 * send_request                 receive_request
2715 * send_convert                 receive_convert
2716 * send_unlock                  receive_unlock
2717 * send_cancel                  receive_cancel
2718 * send_grant                   receive_grant
2719 * send_bast                    receive_bast
2720 * send_lookup                  receive_lookup
2721 * send_remove                  receive_remove
2722 *
2723 *                              send_common_reply
2724 * receive_request_reply        send_request_reply
2725 * receive_convert_reply        send_convert_reply
2726 * receive_unlock_reply         send_unlock_reply
2727 * receive_cancel_reply         send_cancel_reply
2728 * receive_lookup_reply         send_lookup_reply
2729 */
2730
2731static int _create_message(struct dlm_ls *ls, int mb_len,
2732                           int to_nodeid, int mstype,
2733                           struct dlm_message **ms_ret,
2734                           struct dlm_mhandle **mh_ret)
2735{
2736        struct dlm_message *ms;
2737        struct dlm_mhandle *mh;
2738        char *mb;
2739
2740        /* get_buffer gives us a message handle (mh) that we need to
2741           pass into lowcomms_commit and a message buffer (mb) that we
2742           write our data into */
2743
2744        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2745        if (!mh)
2746                return -ENOBUFS;
2747
2748        memset(mb, 0, mb_len);
2749
2750        ms = (struct dlm_message *) mb;
2751
2752        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2753        ms->m_header.h_lockspace = ls->ls_global_id;
2754        ms->m_header.h_nodeid = dlm_our_nodeid();
2755        ms->m_header.h_length = mb_len;
2756        ms->m_header.h_cmd = DLM_MSG;
2757
2758        ms->m_type = mstype;
2759
2760        *mh_ret = mh;
2761        *ms_ret = ms;
2762        return 0;
2763}
2764
2765static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2766                          int to_nodeid, int mstype,
2767                          struct dlm_message **ms_ret,
2768                          struct dlm_mhandle **mh_ret)
2769{
2770        int mb_len = sizeof(struct dlm_message);
2771
2772        switch (mstype) {
2773        case DLM_MSG_REQUEST:
2774        case DLM_MSG_LOOKUP:
2775        case DLM_MSG_REMOVE:
2776                mb_len += r->res_length;
2777                break;
2778        case DLM_MSG_CONVERT:
2779        case DLM_MSG_UNLOCK:
2780        case DLM_MSG_REQUEST_REPLY:
2781        case DLM_MSG_CONVERT_REPLY:
2782        case DLM_MSG_GRANT:
2783                if (lkb && lkb->lkb_lvbptr)
2784                        mb_len += r->res_ls->ls_lvblen;
2785                break;
2786        }
2787
2788        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2789                               ms_ret, mh_ret);
2790}
2791
2792/* further lowcomms enhancements or alternate implementations may make
2793   the return value from this function useful at some point */
2794
2795static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2796{
2797        dlm_message_out(ms);
2798        dlm_lowcomms_commit_buffer(mh);
2799        return 0;
2800}
2801
2802static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2803                      struct dlm_message *ms)
2804{
2805        ms->m_nodeid   = lkb->lkb_nodeid;
2806        ms->m_pid      = lkb->lkb_ownpid;
2807        ms->m_lkid     = lkb->lkb_id;
2808        ms->m_remid    = lkb->lkb_remid;
2809        ms->m_exflags  = lkb->lkb_exflags;
2810        ms->m_sbflags  = lkb->lkb_sbflags;
2811        ms->m_flags    = lkb->lkb_flags;
2812        ms->m_lvbseq   = lkb->lkb_lvbseq;
2813        ms->m_status   = lkb->lkb_status;
2814        ms->m_grmode   = lkb->lkb_grmode;
2815        ms->m_rqmode   = lkb->lkb_rqmode;
2816        ms->m_hash     = r->res_hash;
2817
2818        /* m_result and m_bastmode are set from function args,
2819           not from lkb fields */
2820
2821        if (lkb->lkb_bastfn)
2822                ms->m_asts |= AST_BAST;
2823        if (lkb->lkb_astfn)
2824                ms->m_asts |= AST_COMP;
2825
2826        /* compare with switch in create_message; send_remove() doesn't
2827           use send_args() */
2828
2829        switch (ms->m_type) {
2830        case DLM_MSG_REQUEST:
2831        case DLM_MSG_LOOKUP:
2832                memcpy(ms->m_extra, r->res_name, r->res_length);
2833                break;
2834        case DLM_MSG_CONVERT:
2835        case DLM_MSG_UNLOCK:
2836        case DLM_MSG_REQUEST_REPLY:
2837        case DLM_MSG_CONVERT_REPLY:
2838        case DLM_MSG_GRANT:
2839                if (!lkb->lkb_lvbptr)
2840                        break;
2841                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2842                break;
2843        }
2844}
2845
2846static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2847{
2848        struct dlm_message *ms;
2849        struct dlm_mhandle *mh;
2850        int to_nodeid, error;
2851
2852        error = add_to_waiters(lkb, mstype);
2853        if (error)
2854                return error;
2855
2856        to_nodeid = r->res_nodeid;
2857
2858        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2859        if (error)
2860                goto fail;
2861
2862        send_args(r, lkb, ms);
2863
2864        error = send_message(mh, ms);
2865        if (error)
2866                goto fail;
2867        return 0;
2868
2869 fail:
2870        remove_from_waiters(lkb, msg_reply_type(mstype));
2871        return error;
2872}
2873
2874static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2875{
2876        return send_common(r, lkb, DLM_MSG_REQUEST);
2877}
2878
2879static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2880{
2881        int error;
2882
2883        error = send_common(r, lkb, DLM_MSG_CONVERT);
2884
2885        /* down conversions go without a reply from the master */
2886        if (!error && down_conversion(lkb)) {
2887                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2888                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2889                r->res_ls->ls_stub_ms.m_result = 0;
2890                r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2891                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2892        }
2893
2894        return error;
2895}
2896
2897/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2898   MASTER_UNCERTAIN to force the next request on the rsb to confirm
2899   that the master is still correct. */
2900
2901static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902{
2903        return send_common(r, lkb, DLM_MSG_UNLOCK);
2904}
2905
2906static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2907{
2908        return send_common(r, lkb, DLM_MSG_CANCEL);
2909}
2910
2911static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2912{
2913        struct dlm_message *ms;
2914        struct dlm_mhandle *mh;
2915        int to_nodeid, error;
2916
2917        to_nodeid = lkb->lkb_nodeid;
2918
2919        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2920        if (error)
2921                goto out;
2922
2923        send_args(r, lkb, ms);
2924
2925        ms->m_result = 0;
2926
2927        error = send_message(mh, ms);
2928 out:
2929        return error;
2930}
2931
2932static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2933{
2934        struct dlm_message *ms;
2935        struct dlm_mhandle *mh;
2936        int to_nodeid, error;
2937
2938        to_nodeid = lkb->lkb_nodeid;
2939
2940        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2941        if (error)
2942                goto out;
2943
2944        send_args(r, lkb, ms);
2945
2946        ms->m_bastmode = mode;
2947
2948        error = send_message(mh, ms);
2949 out:
2950        return error;
2951}
2952
2953static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2954{
2955        struct dlm_message *ms;
2956        struct dlm_mhandle *mh;
2957        int to_nodeid, error;
2958
2959        error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2960        if (error)
2961                return error;
2962
2963        to_nodeid = dlm_dir_nodeid(r);
2964
2965        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2966        if (error)
2967                goto fail;
2968
2969        send_args(r, lkb, ms);
2970
2971        error = send_message(mh, ms);
2972        if (error)
2973                goto fail;
2974        return 0;
2975
2976 fail:
2977        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2978        return error;
2979}
2980
2981static int send_remove(struct dlm_rsb *r)
2982{
2983        struct dlm_message *ms;
2984        struct dlm_mhandle *mh;
2985        int to_nodeid, error;
2986
2987        to_nodeid = dlm_dir_nodeid(r);
2988
2989        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2990        if (error)
2991                goto out;
2992
2993        memcpy(ms->m_extra, r->res_name, r->res_length);
2994        ms->m_hash = r->res_hash;
2995
2996        error = send_message(mh, ms);
2997 out:
2998        return error;
2999}
3000
3001static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3002                             int mstype, int rv)
3003{
3004        struct dlm_message *ms;
3005        struct dlm_mhandle *mh;
3006        int to_nodeid, error;
3007
3008        to_nodeid = lkb->lkb_nodeid;
3009
3010        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3011        if (error)
3012                goto out;
3013
3014        send_args(r, lkb, ms);
3015
3016        ms->m_result = rv;
3017
3018        error = send_message(mh, ms);
3019 out:
3020        return error;
3021}
3022
3023static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3024{
3025        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3026}
3027
3028static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3029{
3030        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3031}
3032
3033static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3034{
3035        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3036}
3037
3038static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3039{
3040        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3041}
3042
3043static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3044                             int ret_nodeid, int rv)
3045{
3046        struct dlm_rsb *r = &ls->ls_stub_rsb;
3047        struct dlm_message *ms;
3048        struct dlm_mhandle *mh;
3049        int error, nodeid = ms_in->m_header.h_nodeid;
3050
3051        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3052        if (error)
3053                goto out;
3054
3055        ms->m_lkid = ms_in->m_lkid;
3056        ms->m_result = rv;
3057        ms->m_nodeid = ret_nodeid;
3058
3059        error = send_message(mh, ms);
3060 out:
3061        return error;
3062}
3063
3064/* which args we save from a received message depends heavily on the type
3065   of message, unlike the send side where we can safely send everything about
3066   the lkb for any type of message */
3067
3068static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3069{
3070        lkb->lkb_exflags = ms->m_exflags;
3071        lkb->lkb_sbflags = ms->m_sbflags;
3072        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3073                         (ms->m_flags & 0x0000FFFF);
3074}
3075
3076static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3077{
3078        lkb->lkb_sbflags = ms->m_sbflags;
3079        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3080                         (ms->m_flags & 0x0000FFFF);
3081}
3082
3083static int receive_extralen(struct dlm_message *ms)
3084{
3085        return (ms->m_header.h_length - sizeof(struct dlm_message));
3086}
3087
3088static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3089                       struct dlm_message *ms)
3090{
3091        int len;
3092
3093        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3094                if (!lkb->lkb_lvbptr)
3095                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3096                if (!lkb->lkb_lvbptr)
3097                        return -ENOMEM;
3098                len = receive_extralen(ms);
3099                if (len > DLM_RESNAME_MAXLEN)
3100                        len = DLM_RESNAME_MAXLEN;
3101                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3102        }
3103        return 0;
3104}
3105
3106static void fake_bastfn(void *astparam, int mode)
3107{
3108        log_print("fake_bastfn should not be called");
3109}
3110
3111static void fake_astfn(void *astparam)
3112{
3113        log_print("fake_astfn should not be called");
3114}
3115
3116static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3117                                struct dlm_message *ms)
3118{
3119        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3120        lkb->lkb_ownpid = ms->m_pid;
3121        lkb->lkb_remid = ms->m_lkid;
3122        lkb->lkb_grmode = DLM_LOCK_IV;
3123        lkb->lkb_rqmode = ms->m_rqmode;
3124
3125        lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3126        lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3127
3128        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3129                /* lkb was just created so there won't be an lvb yet */
3130                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3131                if (!lkb->lkb_lvbptr)
3132                        return -ENOMEM;
3133        }
3134
3135        return 0;
3136}
3137
3138static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3139                                struct dlm_message *ms)
3140{
3141        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3142                return -EBUSY;
3143
3144        if (receive_lvb(ls, lkb, ms))
3145                return -ENOMEM;
3146
3147        lkb->lkb_rqmode = ms->m_rqmode;
3148        lkb->lkb_lvbseq = ms->m_lvbseq;
3149
3150        return 0;
3151}
3152
3153static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3154                               struct dlm_message *ms)
3155{
3156        if (receive_lvb(ls, lkb, ms))
3157                return -ENOMEM;
3158        return 0;
3159}
3160
3161/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3162   uses to send a reply and that the remote end uses to process the reply. */
3163
3164static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3165{
3166        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3167        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3168        lkb->lkb_remid = ms->m_lkid;
3169}
3170
3171/* This is called after the rsb is locked so that we can safely inspect
3172   fields in the lkb. */
3173
3174static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3175{
3176        int from = ms->m_header.h_nodeid;
3177        int error = 0;
3178
3179        switch (ms->m_type) {
3180        case DLM_MSG_CONVERT:
3181        case DLM_MSG_UNLOCK:
3182        case DLM_MSG_CANCEL:
3183                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3184                        error = -EINVAL;
3185                break;
3186
3187        case DLM_MSG_CONVERT_REPLY:
3188        case DLM_MSG_UNLOCK_REPLY:
3189        case DLM_MSG_CANCEL_REPLY:
3190        case DLM_MSG_GRANT:
3191        case DLM_MSG_BAST:
3192                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3193                        error = -EINVAL;
3194                break;
3195
3196        case DLM_MSG_REQUEST_REPLY:
3197                if (!is_process_copy(lkb))
3198                        error = -EINVAL;
3199                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3200                        error = -EINVAL;
3201                break;
3202
3203        default:
3204                error = -EINVAL;
3205        }
3206
3207        if (error)
3208                log_error(lkb->lkb_resource->res_ls,
3209                          "ignore invalid message %d from %d %x %x %x %d",
3210                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3211                          lkb->lkb_flags, lkb->lkb_nodeid);
3212        return error;
3213}
3214
3215static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3216{
3217        struct dlm_lkb *lkb;
3218        struct dlm_rsb *r;
3219        int error, namelen;
3220
3221        error = create_lkb(ls, &lkb);
3222        if (error)
3223                goto fail;
3224
3225        receive_flags(lkb, ms);
3226        lkb->lkb_flags |= DLM_IFL_MSTCPY;
3227        error = receive_request_args(ls, lkb, ms);
3228        if (error) {
3229                __put_lkb(ls, lkb);
3230                goto fail;
3231        }
3232
3233        namelen = receive_extralen(ms);
3234
3235        error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3236        if (error) {
3237                __put_lkb(ls, lkb);
3238                goto fail;
3239        }
3240
3241        lock_rsb(r);
3242
3243        attach_lkb(r, lkb);
3244        error = do_request(r, lkb);
3245        send_request_reply(r, lkb, error);
3246        do_request_effects(r, lkb, error);
3247
3248        unlock_rsb(r);
3249        put_rsb(r);
3250
3251        if (error == -EINPROGRESS)
3252                error = 0;
3253        if (error)
3254                dlm_put_lkb(lkb);
3255        return;
3256
3257 fail:
3258        setup_stub_lkb(ls, ms);
3259        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3260}
3261
3262static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3263{
3264        struct dlm_lkb *lkb;
3265        struct dlm_rsb *r;
3266        int error, reply = 1;
3267
3268        error = find_lkb(ls, ms->m_remid, &lkb);
3269        if (error)
3270                goto fail;
3271
3272        r = lkb->lkb_resource;
3273
3274        hold_rsb(r);
3275        lock_rsb(r);
3276
3277        error = validate_message(lkb, ms);
3278        if (error)
3279                goto out;
3280
3281        receive_flags(lkb, ms);
3282
3283        error = receive_convert_args(ls, lkb, ms);
3284        if (error) {
3285                send_convert_reply(r, lkb, error);
3286                goto out;
3287        }
3288
3289        reply = !down_conversion(lkb);
3290
3291        error = do_convert(r, lkb);
3292        if (reply)
3293                send_convert_reply(r, lkb, error);
3294        do_convert_effects(r, lkb, error);
3295 out:
3296        unlock_rsb(r);
3297        put_rsb(r);
3298        dlm_put_lkb(lkb);
3299        return;
3300
3301 fail:
3302        setup_stub_lkb(ls, ms);
3303        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3304}
3305
3306static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3307{
3308        struct dlm_lkb *lkb;
3309        struct dlm_rsb *r;
3310        int error;
3311
3312        error = find_lkb(ls, ms->m_remid, &lkb);
3313        if (error)
3314                goto fail;
3315
3316        r = lkb->lkb_resource;
3317
3318        hold_rsb(r);
3319        lock_rsb(r);
3320
3321        error = validate_message(lkb, ms);
3322        if (error)
3323                goto out;
3324
3325        receive_flags(lkb, ms);
3326
3327        error = receive_unlock_args(ls, lkb, ms);
3328        if (error) {
3329                send_unlock_reply(r, lkb, error);
3330                goto out;
3331        }
3332
3333        error = do_unlock(r, lkb);
3334        send_unlock_reply(r, lkb, error);
3335        do_unlock_effects(r, lkb, error);
3336 out:
3337        unlock_rsb(r);
3338        put_rsb(r);
3339        dlm_put_lkb(lkb);
3340        return;
3341
3342 fail:
3343        setup_stub_lkb(ls, ms);
3344        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3345}
3346
3347static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3348{
3349        struct dlm_lkb *lkb;
3350        struct dlm_rsb *r;
3351        int error;
3352
3353        error = find_lkb(ls, ms->m_remid, &lkb);
3354        if (error)
3355                goto fail;
3356
3357        receive_flags(lkb, ms);
3358
3359        r = lkb->lkb_resource;
3360
3361        hold_rsb(r);
3362        lock_rsb(r);
3363
3364        error = validate_message(lkb, ms);
3365        if (error)
3366                goto out;
3367
3368        error = do_cancel(r, lkb);
3369        send_cancel_reply(r, lkb, error);
3370        do_cancel_effects(r, lkb, error);
3371 out:
3372        unlock_rsb(r);
3373        put_rsb(r);
3374        dlm_put_lkb(lkb);
3375        return;
3376
3377 fail:
3378        setup_stub_lkb(ls, ms);
3379        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3380}
3381
3382static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3383{
3384        struct dlm_lkb *lkb;
3385        struct dlm_rsb *r;
3386        int error;
3387
3388        error = find_lkb(ls, ms->m_remid, &lkb);
3389        if (error) {
3390                log_debug(ls, "receive_grant from %d no lkb %x",
3391                          ms->m_header.h_nodeid, ms->m_remid);
3392                return;
3393        }
3394
3395        r = lkb->lkb_resource;
3396
3397        hold_rsb(r);
3398        lock_rsb(r);
3399
3400        error = validate_message(lkb, ms);
3401        if (error)
3402                goto out;
3403
3404        receive_flags_reply(lkb, ms);
3405        if (is_altmode(lkb))
3406                munge_altmode(lkb, ms);
3407        grant_lock_pc(r, lkb, ms);
3408        queue_cast(r, lkb, 0);
3409 out:
3410        unlock_rsb(r);
3411        put_rsb(r);
3412        dlm_put_lkb(lkb);
3413}
3414
3415static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3416{
3417        struct dlm_lkb *lkb;
3418        struct dlm_rsb *r;
3419        int error;
3420
3421        error = find_lkb(ls, ms->m_remid, &lkb);
3422        if (error) {
3423                log_debug(ls, "receive_bast from %d no lkb %x",
3424                          ms->m_header.h_nodeid, ms->m_remid);
3425                return;
3426        }
3427
3428        r = lkb->lkb_resource;
3429
3430        hold_rsb(r);
3431        lock_rsb(r);
3432
3433        error = validate_message(lkb, ms);
3434        if (error)
3435                goto out;
3436
3437        queue_bast(r, lkb, ms->m_bastmode);
3438 out:
3439        unlock_rsb(r);
3440        put_rsb(r);
3441        dlm_put_lkb(lkb);
3442}
3443
3444static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3445{
3446        int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3447
3448        from_nodeid = ms->m_header.h_nodeid;
3449        our_nodeid = dlm_our_nodeid();
3450
3451        len = receive_extralen(ms);
3452
3453        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3454        if (dir_nodeid != our_nodeid) {
3455                log_error(ls, "lookup dir_nodeid %d from %d",
3456                          dir_nodeid, from_nodeid);
3457                error = -EINVAL;
3458                ret_nodeid = -1;
3459                goto out;
3460        }
3461
3462        error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3463
3464        /* Optimization: we're master so treat lookup as a request */
3465        if (!error && ret_nodeid == our_nodeid) {
3466                receive_request(ls, ms);
3467                return;
3468        }
3469 out:
3470        send_lookup_reply(ls, ms, ret_nodeid, error);
3471}
3472
3473static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3474{
3475        int len, dir_nodeid, from_nodeid;
3476
3477        from_nodeid = ms->m_header.h_nodeid;
3478
3479        len = receive_extralen(ms);
3480
3481        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3482        if (dir_nodeid != dlm_our_nodeid()) {
3483                log_error(ls, "remove dir entry dir_nodeid %d from %d",
3484                          dir_nodeid, from_nodeid);
3485                return;
3486        }
3487
3488        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3489}
3490
3491static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3492{
3493        do_purge(ls, ms->m_nodeid, ms->m_pid);
3494}
3495
3496static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3497{
3498        struct dlm_lkb *lkb;
3499        struct dlm_rsb *r;
3500        int error, mstype, result;
3501
3502        error = find_lkb(ls, ms->m_remid, &lkb);
3503        if (error) {
3504                log_debug(ls, "receive_request_reply from %d no lkb %x",
3505                          ms->m_header.h_nodeid, ms->m_remid);
3506                return;
3507        }
3508
3509        r = lkb->lkb_resource;
3510        hold_rsb(r);
3511        lock_rsb(r);
3512
3513        error = validate_message(lkb, ms);
3514        if (error)
3515                goto out;
3516
3517        mstype = lkb->lkb_wait_type;
3518        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3519        if (error)
3520                goto out;
3521
3522        /* Optimization: the dir node was also the master, so it took our
3523           lookup as a request and sent request reply instead of lookup reply */
3524        if (mstype == DLM_MSG_LOOKUP) {
3525                r->res_nodeid = ms->m_header.h_nodeid;
3526                lkb->lkb_nodeid = r->res_nodeid;
3527        }
3528
3529        /* this is the value returned from do_request() on the master */
3530        result = ms->m_result;
3531
3532        switch (result) {
3533        case -EAGAIN:
3534                /* request would block (be queued) on remote master */
3535                queue_cast(r, lkb, -EAGAIN);
3536                confirm_master(r, -EAGAIN);
3537                unhold_lkb(lkb); /* undoes create_lkb() */
3538                break;
3539
3540        case -EINPROGRESS:
3541        case 0:
3542                /* request was queued or granted on remote master */
3543                receive_flags_reply(lkb, ms);
3544                lkb->lkb_remid = ms->m_lkid;
3545                if (is_altmode(lkb))
3546                        munge_altmode(lkb, ms);
3547                if (result) {
3548                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
3549                        add_timeout(lkb);
3550                } else {
3551                        grant_lock_pc(r, lkb, ms);
3552                        queue_cast(r, lkb, 0);
3553                }
3554                confirm_master(r, result);
3555                break;
3556
3557        case -EBADR:
3558        case -ENOTBLK:
3559                /* find_rsb failed to find rsb or rsb wasn't master */
3560                log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3561                          lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3562                r->res_nodeid = -1;
3563                lkb->lkb_nodeid = -1;
3564
3565                if (is_overlap(lkb)) {
3566                        /* we'll ignore error in cancel/unlock reply */
3567                        queue_cast_overlap(r, lkb);
3568                        confirm_master(r, result);
3569                        unhold_lkb(lkb); /* undoes create_lkb() */
3570                } else
3571                        _request_lock(r, lkb);
3572                break;
3573
3574        default:
3575                log_error(ls, "receive_request_reply %x error %d",
3576                          lkb->lkb_id, result);
3577        }
3578
3579        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3580                log_debug(ls, "receive_request_reply %x result %d unlock",
3581                          lkb->lkb_id, result);
3582                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3583                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3584                send_unlock(r, lkb);
3585        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3586                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3587                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3588                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3589                send_cancel(r, lkb);
3590        } else {
3591                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3592                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3593        }
3594 out:
3595        unlock_rsb(r);
3596        put_rsb(r);
3597        dlm_put_lkb(lkb);
3598}
3599
3600static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3601                                    struct dlm_message *ms)
3602{
3603        /* this is the value returned from do_convert() on the master */
3604        switch (ms->m_result) {
3605        case -EAGAIN:
3606                /* convert would block (be queued) on remote master */
3607                queue_cast(r, lkb, -EAGAIN);
3608                break;
3609
3610        case -EDEADLK:
3611                receive_flags_reply(lkb, ms);
3612                revert_lock_pc(r, lkb);
3613                queue_cast(r, lkb, -EDEADLK);
3614                break;
3615
3616        case -EINPROGRESS:
3617                /* convert was queued on remote master */
3618                receive_flags_reply(lkb, ms);
3619                if (is_demoted(lkb))
3620                        munge_demoted(lkb, ms);
3621                del_lkb(r, lkb);
3622                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3623                add_timeout(lkb);
3624                break;
3625
3626        case 0:
3627                /* convert was granted on remote master */
3628                receive_flags_reply(lkb, ms);
3629                if (is_demoted(lkb))
3630                        munge_demoted(lkb, ms);
3631                grant_lock_pc(r, lkb, ms);
3632                queue_cast(r, lkb, 0);
3633                break;
3634
3635        default:
3636                log_error(r->res_ls, "receive_convert_reply %x error %d",
3637                          lkb->lkb_id, ms->m_result);
3638        }
3639}
3640
3641static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3642{
3643        struct dlm_rsb *r = lkb->lkb_resource;
3644        int error;
3645
3646        hold_rsb(r);
3647        lock_rsb(r);
3648
3649        error = validate_message(lkb, ms);
3650        if (error)
3651                goto out;
3652
3653        /* stub reply can happen with waiters_mutex held */
3654        error = remove_from_waiters_ms(lkb, ms);
3655        if (error)
3656                goto out;
3657
3658        __receive_convert_reply(r, lkb, ms);
3659 out:
3660        unlock_rsb(r);
3661        put_rsb(r);
3662}
3663
3664static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3665{
3666        struct dlm_lkb *lkb;
3667        int error;
3668
3669        error = find_lkb(ls, ms->m_remid, &lkb);
3670        if (error) {
3671                log_debug(ls, "receive_convert_reply from %d no lkb %x",
3672                          ms->m_header.h_nodeid, ms->m_remid);
3673                return;
3674        }
3675
3676        _receive_convert_reply(lkb, ms);
3677        dlm_put_lkb(lkb);
3678}
3679
3680static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3681{
3682        struct dlm_rsb *r = lkb->lkb_resource;
3683        int error;
3684
3685        hold_rsb(r);
3686        lock_rsb(r);
3687
3688        error = validate_message(lkb, ms);
3689        if (error)
3690                goto out;
3691
3692        /* stub reply can happen with waiters_mutex held */
3693        error = remove_from_waiters_ms(lkb, ms);
3694        if (error)
3695                goto out;
3696
3697        /* this is the value returned from do_unlock() on the master */
3698
3699        switch (ms->m_result) {
3700        case -DLM_EUNLOCK:
3701                receive_flags_reply(lkb, ms);
3702                remove_lock_pc(r, lkb);
3703                queue_cast(r, lkb, -DLM_EUNLOCK);
3704                break;
3705        case -ENOENT:
3706                break;
3707        default:
3708                log_error(r->res_ls, "receive_unlock_reply %x error %d",
3709                          lkb->lkb_id, ms->m_result);
3710        }
3711 out:
3712        unlock_rsb(r);
3713        put_rsb(r);
3714}
3715
3716static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3717{
3718        struct dlm_lkb *lkb;
3719        int error;
3720
3721        error = find_lkb(ls, ms->m_remid, &lkb);
3722        if (error) {
3723                log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3724                          ms->m_header.h_nodeid, ms->m_remid);
3725                return;
3726        }
3727
3728        _receive_unlock_reply(lkb, ms);
3729        dlm_put_lkb(lkb);
3730}
3731
3732static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3733{
3734        struct dlm_rsb *r = lkb->lkb_resource;
3735        int error;
3736
3737        hold_rsb(r);
3738        lock_rsb(r);
3739
3740        error = validate_message(lkb, ms);
3741        if (error)
3742                goto out;
3743
3744        /* stub reply can happen with waiters_mutex held */
3745        error = remove_from_waiters_ms(lkb, ms);
3746        if (error)
3747                goto out;
3748
3749        /* this is the value returned from do_cancel() on the master */
3750
3751        switch (ms->m_result) {
3752        case -DLM_ECANCEL:
3753                receive_flags_reply(lkb, ms);
3754                revert_lock_pc(r, lkb);
3755                queue_cast(r, lkb, -DLM_ECANCEL);
3756                break;
3757        case 0:
3758                break;
3759        default:
3760                log_error(r->res_ls, "receive_cancel_reply %x error %d",
3761                          lkb->lkb_id, ms->m_result);
3762        }
3763 out:
3764        unlock_rsb(r);
3765        put_rsb(r);
3766}
3767
3768static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3769{
3770        struct dlm_lkb *lkb;
3771        int error;
3772
3773        error = find_lkb(ls, ms->m_remid, &lkb);
3774        if (error) {
3775                log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3776                          ms->m_header.h_nodeid, ms->m_remid);
3777                return;
3778        }
3779
3780        _receive_cancel_reply(lkb, ms);
3781        dlm_put_lkb(lkb);
3782}
3783
3784static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3785{
3786        struct dlm_lkb *lkb;
3787        struct dlm_rsb *r;
3788        int error, ret_nodeid;
3789
3790        error = find_lkb(ls, ms->m_lkid, &lkb);
3791        if (error) {
3792                log_error(ls, "receive_lookup_reply no lkb");
3793                return;
3794        }
3795
3796        /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3797           FIXME: will a non-zero error ever be returned? */
3798
3799        r = lkb->lkb_resource;
3800        hold_rsb(r);
3801        lock_rsb(r);
3802
3803        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3804        if (error)
3805                goto out;
3806
3807        ret_nodeid = ms->m_nodeid;
3808        if (ret_nodeid == dlm_our_nodeid()) {
3809                r->res_nodeid = 0;
3810                ret_nodeid = 0;
3811                r->res_first_lkid = 0;
3812        } else {
3813                /* set_master() will copy res_nodeid to lkb_nodeid */
3814                r->res_nodeid = ret_nodeid;
3815        }
3816
3817        if (is_overlap(lkb)) {
3818                log_debug(ls, "receive_lookup_reply %x unlock %x",
3819                          lkb->lkb_id, lkb->lkb_flags);
3820                queue_cast_overlap(r, lkb);
3821                unhold_lkb(lkb); /* undoes create_lkb() */
3822                goto out_list;
3823        }
3824
3825        _request_lock(r, lkb);
3826
3827 out_list:
3828        if (!ret_nodeid)
3829                process_lookup_list(r);
3830 out:
3831        unlock_rsb(r);
3832        put_rsb(r);
3833        dlm_put_lkb(lkb);
3834}
3835
3836static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3837{
3838        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3839                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3840                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3841                          ms->m_remid, ms->m_result);
3842                return;
3843        }
3844
3845        switch (ms->m_type) {
3846
3847        /* messages sent to a master node */
3848
3849        case DLM_MSG_REQUEST:
3850                receive_request(ls, ms);
3851                break;
3852
3853        case DLM_MSG_CONVERT:
3854                receive_convert(ls, ms);
3855                break;
3856
3857        case DLM_MSG_UNLOCK:
3858                receive_unlock(ls, ms);
3859                break;
3860
3861        case DLM_MSG_CANCEL:
3862                receive_cancel(ls, ms);
3863                break;
3864
3865        /* messages sent from a master node (replies to above) */
3866
3867        case DLM_MSG_REQUEST_REPLY:
3868                receive_request_reply(ls, ms);
3869                break;
3870
3871        case DLM_MSG_CONVERT_REPLY:
3872                receive_convert_reply(ls, ms);
3873                break;
3874
3875        case DLM_MSG_UNLOCK_REPLY:
3876                receive_unlock_reply(ls, ms);
3877                break;
3878
3879        case DLM_MSG_CANCEL_REPLY:
3880                receive_cancel_reply(ls, ms);
3881                break;
3882
3883        /* messages sent from a master node (only two types of async msg) */
3884
3885        case DLM_MSG_GRANT:
3886                receive_grant(ls, ms);
3887                break;
3888
3889        case DLM_MSG_BAST:
3890                receive_bast(ls, ms);
3891                break;
3892
3893        /* messages sent to a dir node */
3894
3895        case DLM_MSG_LOOKUP:
3896                receive_lookup(ls, ms);
3897                break;
3898
3899        case DLM_MSG_REMOVE:
3900                receive_remove(ls, ms);
3901                break;
3902
3903        /* messages sent from a dir node (remove has no reply) */
3904
3905        case DLM_MSG_LOOKUP_REPLY:
3906                receive_lookup_reply(ls, ms);
3907                break;
3908
3909        /* other messages */
3910
3911        case DLM_MSG_PURGE:
3912                receive_purge(ls, ms);
3913                break;
3914
3915        default:
3916                log_error(ls, "unknown message type %d", ms->m_type);
3917        }
3918
3919        dlm_astd_wake();
3920}
3921
3922/* If the lockspace is in recovery mode (locking stopped), then normal
3923   messages are saved on the requestqueue for processing after recovery is
3924   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3925   messages off the requestqueue before we process new ones. This occurs right
3926   after recovery completes when we transition from saving all messages on
3927   requestqueue, to processing all the saved messages, to processing new
3928   messages as they arrive. */
3929
3930static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3931                                int nodeid)
3932{
3933        if (dlm_locking_stopped(ls)) {
3934                dlm_add_requestqueue(ls, nodeid, ms);
3935        } else {
3936                dlm_wait_requestqueue(ls);
3937                _receive_message(ls, ms);
3938        }
3939}
3940
3941/* This is called by dlm_recoverd to process messages that were saved on
3942   the requestqueue. */
3943
3944void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3945{
3946        _receive_message(ls, ms);
3947}
3948
3949/* This is called by the midcomms layer when something is received for
3950   the lockspace.  It could be either a MSG (normal message sent as part of
3951   standard locking activity) or an RCOM (recovery message sent as part of
3952   lockspace recovery). */
3953
3954void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3955{
3956        struct dlm_header *hd = &p->header;
3957        struct dlm_ls *ls;
3958        int type = 0;
3959
3960        switch (hd->h_cmd) {
3961        case DLM_MSG:
3962                dlm_message_in(&p->message);
3963                type = p->message.m_type;
3964                break;
3965        case DLM_RCOM:
3966                dlm_rcom_in(&p->rcom);
3967                type = p->rcom.rc_type;
3968                break;
3969        default:
3970                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3971                return;
3972        }
3973
3974        if (hd->h_nodeid != nodeid) {
3975                log_print("invalid h_nodeid %d from %d lockspace %x",
3976                          hd->h_nodeid, nodeid, hd->h_lockspace);
3977                return;
3978        }
3979
3980        ls = dlm_find_lockspace_global(hd->h_lockspace);
3981        if (!ls) {
3982                if (dlm_config.ci_log_debug)
3983                        log_print("invalid lockspace %x from %d cmd %d type %d",
3984                                  hd->h_lockspace, nodeid, hd->h_cmd, type);
3985
3986                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3987                        dlm_send_ls_not_ready(nodeid, &p->rcom);
3988                return;
3989        }
3990
3991        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3992           be inactive (in this ls) before transitioning to recovery mode */
3993
3994        down_read(&ls->ls_recv_active);
3995        if (hd->h_cmd == DLM_MSG)
3996                dlm_receive_message(ls, &p->message, nodeid);
3997        else
3998                dlm_receive_rcom(ls, &p->rcom, nodeid);
3999        up_read(&ls->ls_recv_active);
4000
4001        dlm_put_lockspace(ls);
4002}
4003
4004static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4005{
4006        if (middle_conversion(lkb)) {
4007                hold_lkb(lkb);
4008                ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4009                ls->ls_stub_ms.m_result = -EINPROGRESS;
4010                ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4011                ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4012                _receive_convert_reply(lkb, &ls->ls_stub_ms);
4013
4014                /* Same special case as in receive_rcom_lock_args() */
4015                lkb->lkb_grmode = DLM_LOCK_IV;
4016                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4017                unhold_lkb(lkb);
4018
4019        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4020                lkb->lkb_flags |= DLM_IFL_RESEND;
4021        }
4022
4023        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4024           conversions are async; there's no reply from the remote master */
4025}
4026
4027/* A waiting lkb needs recovery if the master node has failed, or
4028   the master node is changing (only when no directory is used) */
4029
4030static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4031{
4032        if (dlm_is_removed(ls, lkb->lkb_nodeid))
4033                return 1;
4034
4035        if (!dlm_no_directory(ls))
4036                return 0;
4037
4038        if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4039                return 1;
4040
4041        return 0;
4042}
4043
4044/* Recovery for locks that are waiting for replies from nodes that are now
4045   gone.  We can just complete unlocks and cancels by faking a reply from the
4046   dead node.  Requests and up-conversions we flag to be resent after
4047   recovery.  Down-conversions can just be completed with a fake reply like
4048   unlocks.  Conversions between PR and CW need special attention. */
4049
4050void dlm_recover_waiters_pre(struct dlm_ls *ls)
4051{
4052        struct dlm_lkb *lkb, *safe;
4053        int wait_type, stub_unlock_result, stub_cancel_result;
4054
4055        mutex_lock(&ls->ls_waiters_mutex);
4056
4057        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4058                log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4059                          lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4060
4061                /* all outstanding lookups, regardless of destination  will be
4062                   resent after recovery is done */
4063
4064                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4065                        lkb->lkb_flags |= DLM_IFL_RESEND;
4066                        continue;
4067                }
4068
4069                if (!waiter_needs_recovery(ls, lkb))
4070                        continue;
4071
4072                wait_type = lkb->lkb_wait_type;
4073                stub_unlock_result = -DLM_EUNLOCK;
4074                stub_cancel_result = -DLM_ECANCEL;
4075
4076                /* Main reply may have been received leaving a zero wait_type,
4077                   but a reply for the overlapping op may not have been
4078                   received.  In that case we need to fake the appropriate
4079                   reply for the overlap op. */
4080
4081                if (!wait_type) {
4082                        if (is_overlap_cancel(lkb)) {
4083                                wait_type = DLM_MSG_CANCEL;
4084                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4085                                        stub_cancel_result = 0;
4086                        }
4087                        if (is_overlap_unlock(lkb)) {
4088                                wait_type = DLM_MSG_UNLOCK;
4089                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4090                                        stub_unlock_result = -ENOENT;
4091                        }
4092
4093                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
4094                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
4095                                  stub_cancel_result, stub_unlock_result);
4096                }
4097
4098                switch (wait_type) {
4099
4100                case DLM_MSG_REQUEST:
4101                        lkb->lkb_flags |= DLM_IFL_RESEND;
4102                        break;
4103
4104                case DLM_MSG_CONVERT:
4105                        recover_convert_waiter(ls, lkb);
4106                        break;
4107
4108                case DLM_MSG_UNLOCK:
4109                        hold_lkb(lkb);
4110                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4111                        ls->ls_stub_ms.m_result = stub_unlock_result;
4112                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4113                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4114                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4115                        dlm_put_lkb(lkb);
4116                        break;
4117
4118                case DLM_MSG_CANCEL:
4119                        hold_lkb(lkb);
4120                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4121                        ls->ls_stub_ms.m_result = stub_cancel_result;
4122                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4123                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4124                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4125                        dlm_put_lkb(lkb);
4126                        break;
4127
4128                default:
4129                        log_error(ls, "invalid lkb wait_type %d %d",
4130                                  lkb->lkb_wait_type, wait_type);
4131                }
4132                schedule();
4133        }
4134        mutex_unlock(&ls->ls_waiters_mutex);
4135}
4136
4137static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4138{
4139        struct dlm_lkb *lkb;
4140        int found = 0;
4141
4142        mutex_lock(&ls->ls_waiters_mutex);
4143        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4144                if (lkb->lkb_flags & DLM_IFL_RESEND) {
4145                        hold_lkb(lkb);
4146                        found = 1;
4147                        break;
4148                }
4149        }
4150        mutex_unlock(&ls->ls_waiters_mutex);
4151
4152        if (!found)
4153                lkb = NULL;
4154        return lkb;
4155}
4156
4157/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4158   master or dir-node for r.  Processing the lkb may result in it being placed
4159   back on waiters. */
4160
4161/* We do this after normal locking has been enabled and any saved messages
4162   (in requestqueue) have been processed.  We should be confident that at
4163   this point we won't get or process a reply to any of these waiting
4164   operations.  But, new ops may be coming in on the rsbs/locks here from
4165   userspace or remotely. */
4166
4167/* there may have been an overlap unlock/cancel prior to recovery or after
4168   recovery.  if before, the lkb may still have a pos wait_count; if after, the
4169   overlap flag would just have been set and nothing new sent.  we can be
4170   confident here than any replies to either the initial op or overlap ops
4171   prior to recovery have been received. */
4172
4173int dlm_recover_waiters_post(struct dlm_ls *ls)
4174{
4175        struct dlm_lkb *lkb;
4176        struct dlm_rsb *r;
4177        int error = 0, mstype, err, oc, ou;
4178
4179        while (1) {
4180                if (dlm_locking_stopped(ls)) {
4181                        log_debug(ls, "recover_waiters_post aborted");
4182                        error = -EINTR;
4183                        break;
4184                }
4185
4186                lkb = find_resend_waiter(ls);
4187                if (!lkb)
4188                        break;
4189
4190                r = lkb->lkb_resource;
4191                hold_rsb(r);
4192                lock_rsb(r);
4193
4194                mstype = lkb->lkb_wait_type;
4195                oc = is_overlap_cancel(lkb);
4196                ou = is_overlap_unlock(lkb);
4197                err = 0;
4198
4199                log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4200                          lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4201
4202                /* At this point we assume that we won't get a reply to any
4203                   previous op or overlap op on this lock.  First, do a big
4204                   remove_from_waiters() for all previous ops. */
4205
4206                lkb->lkb_flags &= ~DLM_IFL_RESEND;
4207                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4208                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4209                lkb->lkb_wait_type = 0;
4210                lkb->lkb_wait_count = 0;
4211                mutex_lock(&ls->ls_waiters_mutex);
4212                list_del_init(&lkb->lkb_wait_reply);
4213                mutex_unlock(&ls->ls_waiters_mutex);
4214                unhold_lkb(lkb); /* for waiters list */
4215
4216                if (oc || ou) {
4217                        /* do an unlock or cancel instead of resending */
4218                        switch (mstype) {
4219                        case DLM_MSG_LOOKUP:
4220                        case DLM_MSG_REQUEST:
4221                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4222                                                        -DLM_ECANCEL);
4223                                unhold_lkb(lkb); /* undoes create_lkb() */
4224                                break;
4225                        case DLM_MSG_CONVERT:
4226                                if (oc) {
4227                                        queue_cast(r, lkb, -DLM_ECANCEL);
4228                                } else {
4229                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4230                                        _unlock_lock(r, lkb);
4231                                }
4232                                break;
4233                        default:
4234                                err = 1;
4235                        }
4236                } else {
4237                        switch (mstype) {
4238                        case DLM_MSG_LOOKUP:
4239                        case DLM_MSG_REQUEST:
4240                                _request_lock(r, lkb);
4241                                if (is_master(r))
4242                                        confirm_master(r, 0);
4243                                break;
4244                        case DLM_MSG_CONVERT:
4245                                _convert_lock(r, lkb);
4246                                break;
4247                        default:
4248                                err = 1;
4249                        }
4250                }
4251
4252                if (err)
4253                        log_error(ls, "recover_waiters_post %x %d %x %d %d",
4254                                  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4255                unlock_rsb(r);
4256                put_rsb(r);
4257                dlm_put_lkb(lkb);
4258        }
4259
4260        return error;
4261}
4262
4263static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4264                        int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4265{
4266        struct dlm_ls *ls = r->res_ls;
4267        struct dlm_lkb *lkb, *safe;
4268
4269        list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4270                if (test(ls, lkb)) {
4271                        rsb_set_flag(r, RSB_LOCKS_PURGED);
4272                        del_lkb(r, lkb);
4273                        /* this put should free the lkb */
4274                        if (!dlm_put_lkb(lkb))
4275                                log_error(ls, "purged lkb not released");
4276                }
4277        }
4278}
4279
4280static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4281{
4282        return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4283}
4284
4285static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4286{
4287        return is_master_copy(lkb);
4288}
4289
4290static void purge_dead_locks(struct dlm_rsb *r)
4291{
4292        purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4293        purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4294        purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4295}
4296
4297void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4298{
4299        purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4300        purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4301        purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4302}
4303
4304/* Get rid of locks held by nodes that are gone. */
4305
4306int dlm_purge_locks(struct dlm_ls *ls)
4307{
4308        struct dlm_rsb *r;
4309
4310        log_debug(ls, "dlm_purge_locks");
4311
4312        down_write(&ls->ls_root_sem);
4313        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4314                hold_rsb(r);
4315                lock_rsb(r);
4316                if (is_master(r))
4317                        purge_dead_locks(r);
4318                unlock_rsb(r);
4319                unhold_rsb(r);
4320
4321                schedule();
4322        }
4323        up_write(&ls->ls_root_sem);
4324
4325        return 0;
4326}
4327
4328static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4329{
4330        struct dlm_rsb *r, *r_ret = NULL;
4331
4332        spin_lock(&ls->ls_rsbtbl[bucket].lock);
4333        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4334                if (!rsb_flag(r, RSB_LOCKS_PURGED))
4335                        continue;
4336                hold_rsb(r);
4337                rsb_clear_flag(r, RSB_LOCKS_PURGED);
4338                r_ret = r;
4339                break;
4340        }
4341        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4342        return r_ret;
4343}
4344
4345void dlm_grant_after_purge(struct dlm_ls *ls)
4346{
4347        struct dlm_rsb *r;
4348        int bucket = 0;
4349
4350        while (1) {
4351                r = find_purged_rsb(ls, bucket);
4352                if (!r) {
4353                        if (bucket == ls->ls_rsbtbl_size - 1)
4354                                break;
4355                        bucket++;
4356                        continue;
4357                }
4358                lock_rsb(r);
4359                if (is_master(r)) {
4360                        grant_pending_locks(r);
4361                        confirm_master(r, 0);
4362                }
4363                unlock_rsb(r);
4364                put_rsb(r);
4365                schedule();
4366        }
4367}
4368
4369static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4370                                         uint32_t remid)
4371{
4372        struct dlm_lkb *lkb;
4373
4374        list_for_each_entry(lkb, head, lkb_statequeue) {
4375                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4376                        return lkb;
4377        }
4378        return NULL;
4379}
4380
4381static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4382                                    uint32_t remid)
4383{
4384        struct dlm_lkb *lkb;
4385
4386        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4387        if (lkb)
4388                return lkb;
4389        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4390        if (lkb)
4391                return lkb;
4392        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4393        if (lkb)
4394                return lkb;
4395        return NULL;
4396}
4397
4398/* needs at least dlm_rcom + rcom_lock */
4399static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4400                                  struct dlm_rsb *r, struct dlm_rcom *rc)
4401{
4402        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4403
4404        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4405        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4406        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4407        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4408        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4409        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4410        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4411        lkb->lkb_rqmode = rl->rl_rqmode;
4412        lkb->lkb_grmode = rl->rl_grmode;
4413        /* don't set lkb_status because add_lkb wants to itself */
4414
4415        lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4416        lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4417
4418        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4419                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4420                         sizeof(struct rcom_lock);
4421                if (lvblen > ls->ls_lvblen)
4422                        return -EINVAL;
4423                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4424                if (!lkb->lkb_lvbptr)
4425                        return -ENOMEM;
4426                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4427        }
4428
4429        /* Conversions between PR and CW (middle modes) need special handling.
4430           The real granted mode of these converting locks cannot be determined
4431           until all locks have been rebuilt on the rsb (recover_conversion) */
4432
4433        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4434            middle_conversion(lkb)) {
4435                rl->rl_status = DLM_LKSTS_CONVERT;
4436                lkb->lkb_grmode = DLM_LOCK_IV;
4437                rsb_set_flag(r, RSB_RECOVER_CONVERT);
4438        }
4439
4440        return 0;
4441}
4442
4443/* This lkb may have been recovered in a previous aborted recovery so we need
4444   to check if the rsb already has an lkb with the given remote nodeid/lkid.
4445   If so we just send back a standard reply.  If not, we create a new lkb with
4446   the given values and send back our lkid.  We send back our lkid by sending
4447   back the rcom_lock struct we got but with the remid field filled in. */
4448
4449/* needs at least dlm_rcom + rcom_lock */
4450int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4451{
4452        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4453        struct dlm_rsb *r;
4454        struct dlm_lkb *lkb;
4455        int error;
4456
4457        if (rl->rl_parent_lkid) {
4458                error = -EOPNOTSUPP;
4459                goto out;
4460        }
4461
4462        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4463                         R_MASTER, &r);
4464        if (error)
4465                goto out;
4466
4467        lock_rsb(r);
4468
4469        lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4470        if (lkb) {
4471                error = -EEXIST;
4472                goto out_remid;
4473        }
4474
4475        error = create_lkb(ls, &lkb);
4476        if (error)
4477                goto out_unlock;
4478
4479        error = receive_rcom_lock_args(ls, lkb, r, rc);
4480        if (error) {
4481                __put_lkb(ls, lkb);
4482                goto out_unlock;
4483        }
4484
4485        attach_lkb(r, lkb);
4486        add_lkb(r, lkb, rl->rl_status);
4487        error = 0;
4488
4489 out_remid:
4490        /* this is the new value returned to the lock holder for
4491           saving in its process-copy lkb */
4492        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4493
4494 out_unlock:
4495        unlock_rsb(r);
4496        put_rsb(r);
4497 out:
4498        if (error)
4499                log_debug(ls, "recover_master_copy %d %x", error,
4500                          le32_to_cpu(rl->rl_lkid));
4501        rl->rl_result = cpu_to_le32(error);
4502        return error;
4503}
4504
4505/* needs at least dlm_rcom + rcom_lock */
4506int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4507{
4508        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4509        struct dlm_rsb *r;
4510        struct dlm_lkb *lkb;
4511        int error;
4512
4513        error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4514        if (error) {
4515                log_error(ls, "recover_process_copy no lkid %x",
4516                                le32_to_cpu(rl->rl_lkid));
4517                return error;
4518        }
4519
4520        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4521
4522        error = le32_to_cpu(rl->rl_result);
4523
4524        r = lkb->lkb_resource;
4525        hold_rsb(r);
4526        lock_rsb(r);
4527
4528        switch (error) {
4529        case -EBADR:
4530                /* There's a chance the new master received our lock before
4531                   dlm_recover_master_reply(), this wouldn't happen if we did
4532                   a barrier between recover_masters and recover_locks. */
4533                log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4534                          (unsigned long)r, r->res_name);
4535                dlm_send_rcom_lock(r, lkb);
4536                goto out;
4537        case -EEXIST:
4538                log_debug(ls, "master copy exists %x", lkb->lkb_id);
4539                /* fall through */
4540        case 0:
4541                lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4542                break;
4543        default:
4544                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4545                          error, lkb->lkb_id);
4546        }
4547
4548        /* an ack for dlm_recover_locks() which waits for replies from
4549           all the locks it sends to new masters */
4550        dlm_recovered_lock(r);
4551 out:
4552        unlock_rsb(r);
4553        put_rsb(r);
4554        dlm_put_lkb(lkb);
4555
4556        return 0;
4557}
4558
4559int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4560                     int mode, uint32_t flags, void *name, unsigned int namelen,
4561                     unsigned long timeout_cs)
4562{
4563        struct dlm_lkb *lkb;
4564        struct dlm_args args;
4565        int error;
4566
4567        dlm_lock_recovery(ls);
4568
4569        error = create_lkb(ls, &lkb);
4570        if (error) {
4571                kfree(ua);
4572                goto out;
4573        }
4574
4575        if (flags & DLM_LKF_VALBLK) {
4576                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4577                if (!ua->lksb.sb_lvbptr) {
4578                        kfree(ua);
4579                        __put_lkb(ls, lkb);
4580                        error = -ENOMEM;
4581                        goto out;
4582                }
4583        }
4584
4585        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4586           When DLM_IFL_USER is set, the dlm knows that this is a userspace
4587           lock and that lkb_astparam is the dlm_user_args structure. */
4588
4589        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4590                              fake_astfn, ua, fake_bastfn, &args);
4591        lkb->lkb_flags |= DLM_IFL_USER;
4592        ua->old_mode = DLM_LOCK_IV;
4593
4594        if (error) {
4595                __put_lkb(ls, lkb);
4596                goto out;
4597        }
4598
4599        error = request_lock(ls, lkb, name, namelen, &args);
4600
4601        switch (error) {
4602        case 0:
4603                break;
4604        case -EINPROGRESS:
4605                error = 0;
4606                break;
4607        case -EAGAIN:
4608                error = 0;
4609                /* fall through */
4610        default:
4611                __put_lkb(ls, lkb);
4612                goto out;
4613        }
4614
4615        /* add this new lkb to the per-process list of locks */
4616        spin_lock(&ua->proc->locks_spin);
4617        hold_lkb(lkb);
4618        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4619        spin_unlock(&ua->proc->locks_spin);
4620 out:
4621        dlm_unlock_recovery(ls);
4622        return error;
4623}
4624
4625int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4626                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4627                     unsigned long timeout_cs)
4628{
4629        struct dlm_lkb *lkb;
4630        struct dlm_args args;
4631        struct dlm_user_args *ua;
4632        int error;
4633
4634        dlm_lock_recovery(ls);
4635
4636        error = find_lkb(ls, lkid, &lkb);
4637        if (error)
4638                goto out;
4639
4640        /* user can change the params on its lock when it converts it, or
4641           add an lvb that didn't exist before */
4642
4643        ua = lkb->lkb_ua;
4644
4645        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4646                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4647                if (!ua->lksb.sb_lvbptr) {
4648                        error = -ENOMEM;
4649                        goto out_put;
4650                }
4651        }
4652        if (lvb_in && ua->lksb.sb_lvbptr)
4653                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4654
4655        ua->xid = ua_tmp->xid;
4656        ua->castparam = ua_tmp->castparam;
4657        ua->castaddr = ua_tmp->castaddr;
4658        ua->bastparam = ua_tmp->bastparam;
4659        ua->bastaddr = ua_tmp->bastaddr;
4660        ua->user_lksb = ua_tmp->user_lksb;
4661        ua->old_mode = lkb->lkb_grmode;
4662
4663        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4664                              fake_astfn, ua, fake_bastfn, &args);
4665        if (error)
4666                goto out_put;
4667
4668        error = convert_lock(ls, lkb, &args);
4669
4670        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4671                error = 0;
4672 out_put:
4673        dlm_put_lkb(lkb);
4674 out:
4675        dlm_unlock_recovery(ls);
4676        kfree(ua_tmp);
4677        return error;
4678}
4679
4680int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4681                    uint32_t flags, uint32_t lkid, char *lvb_in)
4682{
4683        struct dlm_lkb *lkb;
4684        struct dlm_args args;
4685        struct dlm_user_args *ua;
4686        int error;
4687
4688        dlm_lock_recovery(ls);
4689
4690        error = find_lkb(ls, lkid, &lkb);
4691        if (error)
4692                goto out;
4693
4694        ua = lkb->lkb_ua;
4695
4696        if (lvb_in && ua->lksb.sb_lvbptr)
4697                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4698        if (ua_tmp->castparam)
4699                ua->castparam = ua_tmp->castparam;
4700        ua->user_lksb = ua_tmp->user_lksb;
4701
4702        error = set_unlock_args(flags, ua, &args);
4703        if (error)
4704                goto out_put;
4705
4706        error = unlock_lock(ls, lkb, &args);
4707
4708        if (error == -DLM_EUNLOCK)
4709                error = 0;
4710        /* from validate_unlock_args() */
4711        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4712                error = 0;
4713        if (error)
4714                goto out_put;
4715
4716        spin_lock(&ua->proc->locks_spin);
4717        /* dlm_user_add_ast() may have already taken lkb off the proc list */
4718        if (!list_empty(&lkb->lkb_ownqueue))
4719                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4720        spin_unlock(&ua->proc->locks_spin);
4721 out_put:
4722        dlm_put_lkb(lkb);
4723 out:
4724        dlm_unlock_recovery(ls);
4725        kfree(ua_tmp);
4726        return error;
4727}
4728
4729int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4730                    uint32_t flags, uint32_t lkid)
4731{
4732        struct dlm_lkb *lkb;
4733        struct dlm_args args;
4734        struct dlm_user_args *ua;
4735        int error;
4736
4737        dlm_lock_recovery(ls);
4738
4739        error = find_lkb(ls, lkid, &lkb);
4740        if (error)
4741                goto out;
4742
4743        ua = lkb->lkb_ua;
4744        if (ua_tmp->castparam)
4745                ua->castparam = ua_tmp->castparam;
4746        ua->user_lksb = ua_tmp->user_lksb;
4747
4748        error = set_unlock_args(flags, ua, &args);
4749        if (error)
4750                goto out_put;
4751
4752        error = cancel_lock(ls, lkb, &args);
4753
4754        if (error == -DLM_ECANCEL)
4755                error = 0;
4756        /* from validate_unlock_args() */
4757        if (error == -EBUSY)
4758                error = 0;
4759 out_put:
4760        dlm_put_lkb(lkb);
4761 out:
4762        dlm_unlock_recovery(ls);
4763        kfree(ua_tmp);
4764        return error;
4765}
4766
4767int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4768{
4769        struct dlm_lkb *lkb;
4770        struct dlm_args args;
4771        struct dlm_user_args *ua;
4772        struct dlm_rsb *r;
4773        int error;
4774
4775        dlm_lock_recovery(ls);
4776
4777        error = find_lkb(ls, lkid, &lkb);
4778        if (error)
4779                goto out;
4780
4781        ua = lkb->lkb_ua;
4782
4783        error = set_unlock_args(flags, ua, &args);
4784        if (error)
4785                goto out_put;
4786
4787        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4788
4789        r = lkb->lkb_resource;
4790        hold_rsb(r);
4791        lock_rsb(r);
4792
4793        error = validate_unlock_args(lkb, &args);
4794        if (error)
4795                goto out_r;
4796        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4797
4798        error = _cancel_lock(r, lkb);
4799 out_r:
4800        unlock_rsb(r);
4801        put_rsb(r);
4802
4803        if (error == -DLM_ECANCEL)
4804                error = 0;
4805        /* from validate_unlock_args() */
4806        if (error == -EBUSY)
4807                error = 0;
4808 out_put:
4809        dlm_put_lkb(lkb);
4810 out:
4811        dlm_unlock_recovery(ls);
4812        return error;
4813}
4814
4815/* lkb's that are removed from the waiters list by revert are just left on the
4816   orphans list with the granted orphan locks, to be freed by purge */
4817
4818static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4819{
4820        struct dlm_args args;
4821        int error;
4822
4823        hold_lkb(lkb);
4824        mutex_lock(&ls->ls_orphans_mutex);
4825        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4826        mutex_unlock(&ls->ls_orphans_mutex);
4827
4828        set_unlock_args(0, lkb->lkb_ua, &args);
4829
4830        error = cancel_lock(ls, lkb, &args);
4831        if (error == -DLM_ECANCEL)
4832                error = 0;
4833        return error;
4834}
4835
4836/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4837   Regardless of what rsb queue the lock is on, it's removed and freed. */
4838
4839static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4840{
4841        struct dlm_args args;
4842        int error;
4843
4844        set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4845
4846        error = unlock_lock(ls, lkb, &args);
4847        if (error == -DLM_EUNLOCK)
4848                error = 0;
4849        return error;
4850}
4851
4852/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4853   (which does lock_rsb) due to deadlock with receiving a message that does
4854   lock_rsb followed by dlm_user_add_ast() */
4855
4856static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4857                                     struct dlm_user_proc *proc)
4858{
4859        struct dlm_lkb *lkb = NULL;
4860
4861        mutex_lock(&ls->ls_clear_proc_locks);
4862        if (list_empty(&proc->locks))
4863                goto out;
4864
4865        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4866        list_del_init(&lkb->lkb_ownqueue);
4867
4868        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4869                lkb->lkb_flags |= DLM_IFL_ORPHAN;
4870        else
4871                lkb->lkb_flags |= DLM_IFL_DEAD;
4872 out:
4873        mutex_unlock(&ls->ls_clear_proc_locks);
4874        return lkb;
4875}
4876
4877/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4878   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4879   which we clear here. */
4880
4881/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4882   list, and no more device_writes should add lkb's to proc->locks list; so we
4883   shouldn't need to take asts_spin or locks_spin here.  this assumes that
4884   device reads/writes/closes are serialized -- FIXME: we may need to serialize
4885   them ourself. */
4886
4887void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4888{
4889        struct dlm_lkb *lkb, *safe;
4890
4891        dlm_lock_recovery(ls);
4892
4893        while (1) {
4894                lkb = del_proc_lock(ls, proc);
4895                if (!lkb)
4896                        break;
4897                del_timeout(lkb);
4898                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4899                        orphan_proc_lock(ls, lkb);
4900                else
4901                        unlock_proc_lock(ls, lkb);
4902
4903                /* this removes the reference for the proc->locks list
4904                   added by dlm_user_request, it may result in the lkb
4905                   being freed */
4906
4907                dlm_put_lkb(lkb);
4908        }
4909
4910        mutex_lock(&ls->ls_clear_proc_locks);
4911
4912        /* in-progress unlocks */
4913        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4914                list_del_init(&lkb->lkb_ownqueue);
4915                lkb->lkb_flags |= DLM_IFL_DEAD;
4916                dlm_put_lkb(lkb);
4917        }
4918
4919        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4920                lkb->lkb_ast_type = 0;
4921                list_del(&lkb->lkb_astqueue);
4922                dlm_put_lkb(lkb);
4923        }
4924
4925        mutex_unlock(&ls->ls_clear_proc_locks);
4926        dlm_unlock_recovery(ls);
4927}
4928
4929static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4930{
4931        struct dlm_lkb *lkb, *safe;
4932
4933        while (1) {
4934                lkb = NULL;
4935                spin_lock(&proc->locks_spin);
4936                if (!list_empty(&proc->locks)) {
4937                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
4938                                         lkb_ownqueue);
4939                        list_del_init(&lkb->lkb_ownqueue);
4940                }
4941                spin_unlock(&proc->locks_spin);
4942
4943                if (!lkb)
4944                        break;
4945
4946                lkb->lkb_flags |= DLM_IFL_DEAD;
4947                unlock_proc_lock(ls, lkb);
4948                dlm_put_lkb(lkb); /* ref from proc->locks list */
4949        }
4950
4951        spin_lock(&proc->locks_spin);
4952        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4953                list_del_init(&lkb->lkb_ownqueue);
4954                lkb->lkb_flags |= DLM_IFL_DEAD;
4955                dlm_put_lkb(lkb);
4956        }
4957        spin_unlock(&proc->locks_spin);
4958
4959        spin_lock(&proc->asts_spin);
4960        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4961                list_del(&lkb->lkb_astqueue);
4962                dlm_put_lkb(lkb);
4963        }
4964        spin_unlock(&proc->asts_spin);
4965}
4966
4967/* pid of 0 means purge all orphans */
4968
4969static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4970{
4971        struct dlm_lkb *lkb, *safe;
4972
4973        mutex_lock(&ls->ls_orphans_mutex);
4974        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4975                if (pid && lkb->lkb_ownpid != pid)
4976                        continue;
4977                unlock_proc_lock(ls, lkb);
4978                list_del_init(&lkb->lkb_ownqueue);
4979                dlm_put_lkb(lkb);
4980        }
4981        mutex_unlock(&ls->ls_orphans_mutex);
4982}
4983
4984static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4985{
4986        struct dlm_message *ms;
4987        struct dlm_mhandle *mh;
4988        int error;
4989
4990        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4991                                DLM_MSG_PURGE, &ms, &mh);
4992        if (error)
4993                return error;
4994        ms->m_nodeid = nodeid;
4995        ms->m_pid = pid;
4996
4997        return send_message(mh, ms);
4998}
4999
5000int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5001                   int nodeid, int pid)
5002{
5003        int error = 0;
5004
5005        if (nodeid != dlm_our_nodeid()) {
5006                error = send_purge(ls, nodeid, pid);
5007        } else {
5008                dlm_lock_recovery(ls);
5009                if (pid == current->pid)
5010                        purge_proc_locks(ls, proc);
5011                else
5012                        do_purge(ls, nodeid, pid);
5013                dlm_unlock_recovery(ls);
5014        }
5015        return error;
5016}
5017
5018