linux/fs/dlm/requestqueue.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13#include "dlm_internal.h"
  14#include "member.h"
  15#include "lock.h"
  16#include "dir.h"
  17#include "config.h"
  18#include "requestqueue.h"
  19
  20struct rq_entry {
  21        struct list_head list;
  22        uint32_t recover_seq;
  23        int nodeid;
  24        struct dlm_message request;
  25};
  26
  27/*
  28 * Requests received while the lockspace is in recovery get added to the
  29 * request queue and processed when recovery is complete.  This happens when
  30 * the lockspace is suspended on some nodes before it is on others, or the
  31 * lockspace is enabled on some while still suspended on others.
  32 */
  33
  34void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
  35{
  36        struct rq_entry *e;
  37        int length = ms->m_header.h_length - sizeof(struct dlm_message);
  38
  39        e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
  40        if (!e) {
  41                log_print("dlm_add_requestqueue: out of memory len %d", length);
  42                return;
  43        }
  44
  45        e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
  46        e->nodeid = nodeid;
  47        memcpy(&e->request, ms, ms->m_header.h_length);
  48
  49        mutex_lock(&ls->ls_requestqueue_mutex);
  50        list_add_tail(&e->list, &ls->ls_requestqueue);
  51        mutex_unlock(&ls->ls_requestqueue_mutex);
  52}
  53
  54/*
  55 * Called by dlm_recoverd to process normal messages saved while recovery was
  56 * happening.  Normal locking has been enabled before this is called.  dlm_recv
  57 * upon receiving a message, will wait for all saved messages to be drained
  58 * here before processing the message it got.  If a new dlm_ls_stop() arrives
  59 * while we're processing these saved messages, it may block trying to suspend
  60 * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue.  In that
  61 * case, we don't abort since locking_stopped is still 0.  If dlm_recv is not
  62 * waiting for us, then this processing may be aborted due to locking_stopped.
  63 */
  64
  65int dlm_process_requestqueue(struct dlm_ls *ls)
  66{
  67        struct rq_entry *e;
  68        struct dlm_message *ms;
  69        int error = 0;
  70
  71        mutex_lock(&ls->ls_requestqueue_mutex);
  72
  73        for (;;) {
  74                if (list_empty(&ls->ls_requestqueue)) {
  75                        mutex_unlock(&ls->ls_requestqueue_mutex);
  76                        error = 0;
  77                        break;
  78                }
  79                e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
  80                mutex_unlock(&ls->ls_requestqueue_mutex);
  81
  82                ms = &e->request;
  83
  84                log_limit(ls, "dlm_process_requestqueue msg %d from %d "
  85                          "lkid %x remid %x result %d seq %u",
  86                          ms->m_type, ms->m_header.h_nodeid,
  87                          ms->m_lkid, ms->m_remid, ms->m_result,
  88                          e->recover_seq);
  89
  90                dlm_receive_message_saved(ls, &e->request, e->recover_seq);
  91
  92                mutex_lock(&ls->ls_requestqueue_mutex);
  93                list_del(&e->list);
  94                kfree(e);
  95
  96                if (dlm_locking_stopped(ls)) {
  97                        log_debug(ls, "process_requestqueue abort running");
  98                        mutex_unlock(&ls->ls_requestqueue_mutex);
  99                        error = -EINTR;
 100                        break;
 101                }
 102                schedule();
 103        }
 104
 105        return error;
 106}
 107
 108/*
 109 * After recovery is done, locking is resumed and dlm_recoverd takes all the
 110 * saved requests and processes them as they would have been by dlm_recv.  At
 111 * the same time, dlm_recv will start receiving new requests from remote nodes.
 112 * We want to delay dlm_recv processing new requests until dlm_recoverd has
 113 * finished processing the old saved requests.  We don't check for locking
 114 * stopped here because dlm_ls_stop won't stop locking until it's suspended us
 115 * (dlm_recv).
 116 */
 117
 118void dlm_wait_requestqueue(struct dlm_ls *ls)
 119{
 120        for (;;) {
 121                mutex_lock(&ls->ls_requestqueue_mutex);
 122                if (list_empty(&ls->ls_requestqueue))
 123                        break;
 124                mutex_unlock(&ls->ls_requestqueue_mutex);
 125                schedule();
 126        }
 127        mutex_unlock(&ls->ls_requestqueue_mutex);
 128}
 129
 130static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
 131{
 132        uint32_t type = ms->m_type;
 133
 134        /* the ls is being cleaned up and freed by release_lockspace */
 135        if (!ls->ls_count)
 136                return 1;
 137
 138        if (dlm_is_removed(ls, nodeid))
 139                return 1;
 140
 141        /* directory operations are always purged because the directory is
 142           always rebuilt during recovery and the lookups resent */
 143
 144        if (type == DLM_MSG_REMOVE ||
 145            type == DLM_MSG_LOOKUP ||
 146            type == DLM_MSG_LOOKUP_REPLY)
 147                return 1;
 148
 149        if (!dlm_no_directory(ls))
 150                return 0;
 151
 152        return 1;
 153}
 154
 155void dlm_purge_requestqueue(struct dlm_ls *ls)
 156{
 157        struct dlm_message *ms;
 158        struct rq_entry *e, *safe;
 159
 160        mutex_lock(&ls->ls_requestqueue_mutex);
 161        list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 162                ms =  &e->request;
 163
 164                if (purge_request(ls, ms, e->nodeid)) {
 165                        list_del(&e->list);
 166                        kfree(e);
 167                }
 168        }
 169        mutex_unlock(&ls->ls_requestqueue_mutex);
 170}
 171
 172