1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#include <linux/module.h>
38
39#define DEBUG_SUBSYSTEM S_LLITE
40
41#include "llite_internal.h"
42
43
44void vvp_write_pending(struct vvp_object *club, struct vvp_page *page)
45{
46 struct ll_inode_info *lli = ll_i2info(club->vob_inode);
47
48 spin_lock(&lli->lli_lock);
49 lli->lli_flags |= LLIF_SOM_DIRTY;
50 if (page && list_empty(&page->vpg_pending_linkage))
51 list_add(&page->vpg_pending_linkage, &club->vob_pending_list);
52 spin_unlock(&lli->lli_lock);
53}
54
55
56void vvp_write_complete(struct vvp_object *club, struct vvp_page *page)
57{
58 struct ll_inode_info *lli = ll_i2info(club->vob_inode);
59 int rc = 0;
60
61 spin_lock(&lli->lli_lock);
62 if (page && !list_empty(&page->vpg_pending_linkage)) {
63 list_del_init(&page->vpg_pending_linkage);
64 rc = 1;
65 }
66 spin_unlock(&lli->lli_lock);
67 if (rc)
68 ll_queue_done_writing(club->vob_inode, 0);
69}
70
71
72
73
74
75void ll_queue_done_writing(struct inode *inode, unsigned long flags)
76{
77 struct ll_inode_info *lli = ll_i2info(inode);
78 struct vvp_object *club = cl2vvp(ll_i2info(inode)->lli_clob);
79
80 spin_lock(&lli->lli_lock);
81 lli->lli_flags |= flags;
82
83 if ((lli->lli_flags & LLIF_DONE_WRITING) &&
84 list_empty(&club->vob_pending_list)) {
85 struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
86
87 if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
88 CWARN("%s: file "DFID"(flags %u) Size-on-MDS valid, done writing allowed and no diry pages\n",
89 ll_get_fsname(inode->i_sb, NULL, 0),
90 PFID(ll_inode2fid(inode)), lli->lli_flags);
91
92 spin_lock(&lcq->lcq_lock);
93
94 LASSERT(list_empty(&lli->lli_close_list));
95 CDEBUG(D_INODE, "adding inode "DFID" to close list\n",
96 PFID(ll_inode2fid(inode)));
97 list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
98
99
100
101
102
103
104
105
106 lli->lli_flags &= ~LLIF_DONE_WRITING;
107
108 wake_up(&lcq->lcq_waitq);
109 spin_unlock(&lcq->lcq_lock);
110 }
111 spin_unlock(&lli->lli_lock);
112}
113
114
115void ll_done_writing_attr(struct inode *inode, struct md_op_data *op_data)
116{
117 struct ll_inode_info *lli = ll_i2info(inode);
118
119 op_data->op_flags |= MF_SOM_CHANGE;
120
121 if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
122 CERROR("%s: inode "DFID"(flags %u) MDS holds lock on Size-on-MDS attributes\n",
123 ll_get_fsname(inode->i_sb, NULL, 0),
124 PFID(ll_inode2fid(inode)), lli->lli_flags);
125
126 if (!cl_local_size(inode)) {
127
128 op_data->op_attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
129 ATTR_ATIME_SET | ATTR_SIZE | ATTR_BLOCKS;
130 }
131}
132
133
134void ll_ioepoch_close(struct inode *inode, struct md_op_data *op_data,
135 struct obd_client_handle **och, unsigned long flags)
136{
137 struct ll_inode_info *lli = ll_i2info(inode);
138 struct vvp_object *club = cl2vvp(ll_i2info(inode)->lli_clob);
139
140 spin_lock(&lli->lli_lock);
141 if (!(list_empty(&club->vob_pending_list))) {
142 if (!(lli->lli_flags & LLIF_EPOCH_PENDING)) {
143 LASSERT(*och);
144 LASSERT(!lli->lli_pending_och);
145
146
147
148 lli->lli_flags |= LLIF_EPOCH_PENDING;
149 lli->lli_pending_och = *och;
150 spin_unlock(&lli->lli_lock);
151
152 inode = igrab(inode);
153 LASSERT(inode);
154 goto out;
155 }
156 if (flags & LLIF_DONE_WRITING) {
157
158
159
160
161 LASSERT(!(lli->lli_flags & LLIF_DONE_WRITING));
162 lli->lli_flags |= LLIF_DONE_WRITING;
163 spin_unlock(&lli->lli_lock);
164
165 inode = igrab(inode);
166 LASSERT(inode);
167 goto out;
168 }
169 }
170 CDEBUG(D_INODE, "Epoch %llu closed on "DFID"\n",
171 ll_i2info(inode)->lli_ioepoch, PFID(&lli->lli_fid));
172 op_data->op_flags |= MF_EPOCH_CLOSE;
173
174 if (flags & LLIF_DONE_WRITING) {
175 LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
176 LASSERT(!(lli->lli_flags & LLIF_DONE_WRITING));
177 *och = lli->lli_pending_och;
178 lli->lli_pending_och = NULL;
179 lli->lli_flags &= ~LLIF_EPOCH_PENDING;
180 } else {
181
182 if (!(lli->lli_flags & LLIF_SOM_DIRTY)) {
183 spin_unlock(&lli->lli_lock);
184 goto out;
185 }
186
187
188
189
190 if (lli->lli_flags & LLIF_EPOCH_PENDING) {
191 spin_unlock(&lli->lli_lock);
192 goto out;
193 }
194 }
195
196 LASSERT(list_empty(&club->vob_pending_list));
197 lli->lli_flags &= ~LLIF_SOM_DIRTY;
198 spin_unlock(&lli->lli_lock);
199 ll_done_writing_attr(inode, op_data);
200
201out:
202 return;
203}
204
205
206
207
208
209int ll_som_update(struct inode *inode, struct md_op_data *op_data)
210{
211 struct ll_inode_info *lli = ll_i2info(inode);
212 struct ptlrpc_request *request = NULL;
213 __u32 old_flags;
214 struct obdo *oa;
215 int rc;
216
217 LASSERT(op_data);
218 if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
219 CERROR("%s: inode "DFID"(flags %u) MDS holds lock on Size-on-MDS attributes\n",
220 ll_get_fsname(inode->i_sb, NULL, 0),
221 PFID(ll_inode2fid(inode)), lli->lli_flags);
222
223 oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
224 if (!oa) {
225 CERROR("can't allocate memory for Size-on-MDS update.\n");
226 return -ENOMEM;
227 }
228
229 old_flags = op_data->op_flags;
230 op_data->op_flags = MF_SOM_CHANGE;
231
232
233 if (lli->lli_ioepoch == op_data->op_ioepoch) {
234 rc = ll_inode_getattr(inode, oa, op_data->op_ioepoch,
235 old_flags & MF_GETATTR_LOCK);
236 if (rc) {
237 oa->o_valid = 0;
238 if (rc != -ENOENT)
239 CERROR("%s: inode_getattr failed - unable to send a Size-on-MDS attribute update for inode "DFID": rc = %d\n",
240 ll_get_fsname(inode->i_sb, NULL, 0),
241 PFID(ll_inode2fid(inode)), rc);
242 } else {
243 CDEBUG(D_INODE, "Size-on-MDS update on "DFID"\n",
244 PFID(&lli->lli_fid));
245 }
246
247 md_from_obdo(op_data, oa, oa->o_valid);
248 }
249
250 rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data,
251 NULL, 0, NULL, 0, &request, NULL);
252 ptlrpc_req_finished(request);
253
254 kmem_cache_free(obdo_cachep, oa);
255 return rc;
256}
257
258
259
260
261
262static void ll_prepare_done_writing(struct inode *inode,
263 struct md_op_data *op_data,
264 struct obd_client_handle **och)
265{
266 ll_ioepoch_close(inode, op_data, och, LLIF_DONE_WRITING);
267
268 if (!*och)
269 return;
270
271 ll_pack_inode2opdata(inode, op_data, &(*och)->och_fh);
272 ll_prep_md_op_data(op_data, inode, NULL, NULL,
273 0, 0, LUSTRE_OPC_ANY, NULL);
274}
275
276
277static void ll_done_writing(struct inode *inode)
278{
279 struct obd_client_handle *och = NULL;
280 struct md_op_data *op_data;
281 int rc;
282
283 LASSERT(exp_connect_som(ll_i2mdexp(inode)));
284
285 op_data = kzalloc(sizeof(*op_data), GFP_NOFS);
286 if (!op_data)
287 return;
288
289 ll_prepare_done_writing(inode, op_data, &och);
290
291 if (!och)
292 goto out;
293
294 rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, NULL);
295 if (rc == -EAGAIN)
296
297
298
299 rc = ll_som_update(inode, op_data);
300 else if (rc) {
301 CERROR("%s: inode "DFID" mdc done_writing failed: rc = %d\n",
302 ll_get_fsname(inode->i_sb, NULL, 0),
303 PFID(ll_inode2fid(inode)), rc);
304 }
305out:
306 ll_finish_md_op_data(op_data);
307 if (och) {
308 md_clear_open_replay_data(ll_i2sbi(inode)->ll_md_exp, och);
309 kfree(och);
310 }
311}
312
313static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
314{
315 struct ll_inode_info *lli = NULL;
316
317 spin_lock(&lcq->lcq_lock);
318
319 if (!list_empty(&lcq->lcq_head)) {
320 lli = list_entry(lcq->lcq_head.next, struct ll_inode_info,
321 lli_close_list);
322 list_del_init(&lli->lli_close_list);
323 } else if (atomic_read(&lcq->lcq_stop)) {
324 lli = ERR_PTR(-EALREADY);
325 }
326
327 spin_unlock(&lcq->lcq_lock);
328 return lli;
329}
330
331static int ll_close_thread(void *arg)
332{
333 struct ll_close_queue *lcq = arg;
334
335 complete(&lcq->lcq_comp);
336
337 while (1) {
338 struct l_wait_info lwi = { 0 };
339 struct ll_inode_info *lli;
340 struct inode *inode;
341
342 l_wait_event_exclusive(lcq->lcq_waitq,
343 (lli = ll_close_next_lli(lcq)) != NULL,
344 &lwi);
345 if (IS_ERR(lli))
346 break;
347
348 inode = ll_info2i(lli);
349 CDEBUG(D_INFO, "done_writing for inode "DFID"\n",
350 PFID(ll_inode2fid(inode)));
351 ll_done_writing(inode);
352 iput(inode);
353 }
354
355 CDEBUG(D_INFO, "ll_close exiting\n");
356 complete(&lcq->lcq_comp);
357 return 0;
358}
359
360int ll_close_thread_start(struct ll_close_queue **lcq_ret)
361{
362 struct ll_close_queue *lcq;
363 struct task_struct *task;
364
365 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CLOSE_THREAD))
366 return -EINTR;
367
368 lcq = kzalloc(sizeof(*lcq), GFP_NOFS);
369 if (!lcq)
370 return -ENOMEM;
371
372 spin_lock_init(&lcq->lcq_lock);
373 INIT_LIST_HEAD(&lcq->lcq_head);
374 init_waitqueue_head(&lcq->lcq_waitq);
375 init_completion(&lcq->lcq_comp);
376
377 task = kthread_run(ll_close_thread, lcq, "ll_close");
378 if (IS_ERR(task)) {
379 kfree(lcq);
380 return PTR_ERR(task);
381 }
382
383 wait_for_completion(&lcq->lcq_comp);
384 *lcq_ret = lcq;
385 return 0;
386}
387
388void ll_close_thread_shutdown(struct ll_close_queue *lcq)
389{
390 init_completion(&lcq->lcq_comp);
391 atomic_inc(&lcq->lcq_stop);
392 wake_up(&lcq->lcq_waitq);
393 wait_for_completion(&lcq->lcq_comp);
394 kfree(lcq);
395}
396