1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_CLASS
42
43#include "../include/obd_class.h"
44#include "../include/obd_support.h"
45#include "../include/lustre_fid.h"
46#include <linux/list.h>
47#include <linux/sched.h>
48#include "../include/cl_object.h"
49#include "cl_internal.h"
50
51
52
53
54
55
56
57#define cl_io_for_each(slice, io) \
58 list_for_each_entry((slice), &io->ci_layers, cis_linkage)
59#define cl_io_for_each_reverse(slice, io) \
60 list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
61
62static inline int cl_io_type_is_valid(enum cl_io_type type)
63{
64 return CIT_READ <= type && type < CIT_OP_NR;
65}
66
67static inline int cl_io_is_loopable(const struct cl_io *io)
68{
69 return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
70}
71
72
73
74
75int cl_io_is_going(const struct lu_env *env)
76{
77 return cl_env_info(env)->clt_current_io != NULL;
78}
79EXPORT_SYMBOL(cl_io_is_going);
80
81
82
83
84
85static int cl_io_invariant(const struct cl_io *io)
86{
87 struct cl_io *up;
88
89 up = io->ci_parent;
90 return
91
92
93
94
95
96 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
97 (io->ci_state == CIS_LOCKED && up));
98}
99
100
101
102
103void cl_io_fini(const struct lu_env *env, struct cl_io *io)
104{
105 struct cl_io_slice *slice;
106 struct cl_thread_info *info;
107
108 LINVRNT(cl_io_type_is_valid(io->ci_type));
109 LINVRNT(cl_io_invariant(io));
110
111 while (!list_empty(&io->ci_layers)) {
112 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
113 cis_linkage);
114 list_del_init(&slice->cis_linkage);
115 if (slice->cis_iop->op[io->ci_type].cio_fini)
116 slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
117
118
119
120
121
122 slice->cis_io = NULL;
123 }
124 io->ci_state = CIS_FINI;
125 info = cl_env_info(env);
126 if (info->clt_current_io == io)
127 info->clt_current_io = NULL;
128
129
130 switch (io->ci_type) {
131 case CIT_READ:
132 case CIT_WRITE:
133 break;
134 case CIT_FAULT:
135 case CIT_FSYNC:
136 LASSERT(!io->ci_need_restart);
137 break;
138 case CIT_SETATTR:
139 case CIT_MISC:
140
141 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
142 !io->ci_need_restart));
143 break;
144 default:
145 LBUG();
146 }
147}
148EXPORT_SYMBOL(cl_io_fini);
149
150static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
151 enum cl_io_type iot, struct cl_object *obj)
152{
153 struct cl_object *scan;
154 int result;
155
156 LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
157 LINVRNT(cl_io_type_is_valid(iot));
158 LINVRNT(cl_io_invariant(io));
159
160 io->ci_type = iot;
161 INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
162 INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
163 INIT_LIST_HEAD(&io->ci_lockset.cls_done);
164 INIT_LIST_HEAD(&io->ci_layers);
165
166 result = 0;
167 cl_object_for_each(scan, obj) {
168 if (scan->co_ops->coo_io_init) {
169 result = scan->co_ops->coo_io_init(env, scan, io);
170 if (result != 0)
171 break;
172 }
173 }
174 if (result == 0)
175 io->ci_state = CIS_INIT;
176 return result;
177}
178
179
180
181
182
183
184int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
185 enum cl_io_type iot, struct cl_object *obj)
186{
187 struct cl_thread_info *info = cl_env_info(env);
188
189 LASSERT(obj != cl_object_top(obj));
190 if (!info->clt_current_io)
191 info->clt_current_io = io;
192 return cl_io_init0(env, io, iot, obj);
193}
194EXPORT_SYMBOL(cl_io_sub_init);
195
196
197
198
199
200
201
202
203
204
205
206int cl_io_init(const struct lu_env *env, struct cl_io *io,
207 enum cl_io_type iot, struct cl_object *obj)
208{
209 struct cl_thread_info *info = cl_env_info(env);
210
211 LASSERT(obj == cl_object_top(obj));
212 LASSERT(!info->clt_current_io);
213
214 info->clt_current_io = io;
215 return cl_io_init0(env, io, iot, obj);
216}
217EXPORT_SYMBOL(cl_io_init);
218
219
220
221
222
223
224int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
225 enum cl_io_type iot, loff_t pos, size_t count)
226{
227 LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
228 LINVRNT(io->ci_obj);
229
230 LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
231 "io range: %u [%llu, %llu) %u %u\n",
232 iot, (__u64)pos, (__u64)pos + count,
233 io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
234 io->u.ci_rw.crw_pos = pos;
235 io->u.ci_rw.crw_count = count;
236 return cl_io_init(env, io, iot, io->ci_obj);
237}
238EXPORT_SYMBOL(cl_io_rw_init);
239
240static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
241 const struct cl_lock_descr *d1)
242{
243 return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
244 lu_object_fid(&d1->cld_obj->co_lu)) ?:
245 __diff_normalize(d0->cld_start, d1->cld_start);
246}
247
248static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
249 const struct cl_lock_descr *d1)
250{
251 int ret;
252
253 ret = lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
254 lu_object_fid(&d1->cld_obj->co_lu));
255 if (ret)
256 return ret;
257 if (d0->cld_end < d1->cld_start)
258 return -1;
259 if (d0->cld_start > d0->cld_end)
260 return 1;
261 return 0;
262}
263
264static void cl_lock_descr_merge(struct cl_lock_descr *d0,
265 const struct cl_lock_descr *d1)
266{
267 d0->cld_start = min(d0->cld_start, d1->cld_start);
268 d0->cld_end = max(d0->cld_end, d1->cld_end);
269
270 if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
271 d0->cld_mode = CLM_WRITE;
272
273 if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
274 d0->cld_mode = CLM_GROUP;
275}
276
277
278
279
280static void cl_io_locks_sort(struct cl_io *io)
281{
282 int done = 0;
283
284
285 do {
286 struct cl_io_lock_link *curr;
287 struct cl_io_lock_link *prev;
288 struct cl_io_lock_link *temp;
289
290 done = 1;
291 prev = NULL;
292
293 list_for_each_entry_safe(curr, temp,
294 &io->ci_lockset.cls_todo,
295 cill_linkage) {
296 if (prev) {
297 switch (cl_lock_descr_sort(&prev->cill_descr,
298 &curr->cill_descr)) {
299 case 0:
300
301
302
303
304
305 default:
306 LBUG();
307 case 1:
308 list_move_tail(&curr->cill_linkage,
309 &prev->cill_linkage);
310 done = 0;
311 continue;
312
313
314 case -1:
315 break;
316 }
317 }
318 prev = curr;
319 }
320 } while (!done);
321}
322
323
324
325
326
327
328
329int cl_queue_match(const struct list_head *queue,
330 const struct cl_lock_descr *need)
331{
332 struct cl_io_lock_link *scan;
333
334 list_for_each_entry(scan, queue, cill_linkage) {
335 if (cl_lock_descr_match(&scan->cill_descr, need))
336 return 1;
337 }
338 return 0;
339}
340EXPORT_SYMBOL(cl_queue_match);
341
342static int cl_queue_merge(const struct list_head *queue,
343 const struct cl_lock_descr *need)
344{
345 struct cl_io_lock_link *scan;
346
347 list_for_each_entry(scan, queue, cill_linkage) {
348 if (cl_lock_descr_cmp(&scan->cill_descr, need))
349 continue;
350 cl_lock_descr_merge(&scan->cill_descr, need);
351 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
352 scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
353 scan->cill_descr.cld_end);
354 return 1;
355 }
356 return 0;
357}
358
359static int cl_lockset_match(const struct cl_lockset *set,
360 const struct cl_lock_descr *need)
361{
362 return cl_queue_match(&set->cls_curr, need) ||
363 cl_queue_match(&set->cls_done, need);
364}
365
366static int cl_lockset_merge(const struct cl_lockset *set,
367 const struct cl_lock_descr *need)
368{
369 return cl_queue_merge(&set->cls_todo, need) ||
370 cl_lockset_match(set, need);
371}
372
373static int cl_lockset_lock_one(const struct lu_env *env,
374 struct cl_io *io, struct cl_lockset *set,
375 struct cl_io_lock_link *link)
376{
377 struct cl_lock *lock;
378 int result;
379
380 lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
381
382 if (!IS_ERR(lock)) {
383 link->cill_lock = lock;
384 list_move(&link->cill_linkage, &set->cls_curr);
385 if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
386 result = cl_wait(env, lock);
387 if (result == 0)
388 list_move(&link->cill_linkage, &set->cls_done);
389 } else
390 result = 0;
391 } else
392 result = PTR_ERR(lock);
393 return result;
394}
395
396static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
397 struct cl_io_lock_link *link)
398{
399 struct cl_lock *lock = link->cill_lock;
400
401 list_del_init(&link->cill_linkage);
402 if (lock) {
403 cl_lock_release(env, lock, "io", io);
404 link->cill_lock = NULL;
405 }
406 if (link->cill_fini)
407 link->cill_fini(env, link);
408}
409
410static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
411 struct cl_lockset *set)
412{
413 struct cl_io_lock_link *link;
414 struct cl_io_lock_link *temp;
415 struct cl_lock *lock;
416 int result;
417
418 result = 0;
419 list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
420 if (!cl_lockset_match(set, &link->cill_descr)) {
421
422
423
424 result = cl_lockset_lock_one(env, io, set, link);
425 if (result != 0)
426 break;
427 } else
428 cl_lock_link_fini(env, io, link);
429 }
430 if (result == 0) {
431 list_for_each_entry_safe(link, temp,
432 &set->cls_curr, cill_linkage) {
433 lock = link->cill_lock;
434 result = cl_wait(env, lock);
435 if (result == 0)
436 list_move(&link->cill_linkage, &set->cls_done);
437 else
438 break;
439 }
440 }
441 return result;
442}
443
444
445
446
447
448
449
450
451int cl_io_lock(const struct lu_env *env, struct cl_io *io)
452{
453 const struct cl_io_slice *scan;
454 int result = 0;
455
456 LINVRNT(cl_io_is_loopable(io));
457 LINVRNT(io->ci_state == CIS_IT_STARTED);
458 LINVRNT(cl_io_invariant(io));
459
460 cl_io_for_each(scan, io) {
461 if (!scan->cis_iop->op[io->ci_type].cio_lock)
462 continue;
463 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
464 if (result != 0)
465 break;
466 }
467 if (result == 0) {
468 cl_io_locks_sort(io);
469 result = cl_lockset_lock(env, io, &io->ci_lockset);
470 }
471 if (result != 0)
472 cl_io_unlock(env, io);
473 else
474 io->ci_state = CIS_LOCKED;
475 return result;
476}
477EXPORT_SYMBOL(cl_io_lock);
478
479
480
481
482void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
483{
484 struct cl_lockset *set;
485 struct cl_io_lock_link *link;
486 struct cl_io_lock_link *temp;
487 const struct cl_io_slice *scan;
488
489 LASSERT(cl_io_is_loopable(io));
490 LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
491 LINVRNT(cl_io_invariant(io));
492
493 set = &io->ci_lockset;
494
495 list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
496 cl_lock_link_fini(env, io, link);
497
498 list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
499 cl_lock_link_fini(env, io, link);
500
501 list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
502 cl_unuse(env, link->cill_lock);
503 cl_lock_link_fini(env, io, link);
504 }
505 cl_io_for_each_reverse(scan, io) {
506 if (scan->cis_iop->op[io->ci_type].cio_unlock)
507 scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
508 }
509 io->ci_state = CIS_UNLOCKED;
510 LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
511}
512EXPORT_SYMBOL(cl_io_unlock);
513
514
515
516
517
518
519
520
521int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
522{
523 const struct cl_io_slice *scan;
524 int result;
525
526 LINVRNT(cl_io_is_loopable(io));
527 LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
528 LINVRNT(cl_io_invariant(io));
529
530 result = 0;
531 cl_io_for_each(scan, io) {
532 if (!scan->cis_iop->op[io->ci_type].cio_iter_init)
533 continue;
534 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
535 scan);
536 if (result != 0)
537 break;
538 }
539 if (result == 0)
540 io->ci_state = CIS_IT_STARTED;
541 return result;
542}
543EXPORT_SYMBOL(cl_io_iter_init);
544
545
546
547
548
549
550void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
551{
552 const struct cl_io_slice *scan;
553
554 LINVRNT(cl_io_is_loopable(io));
555 LINVRNT(io->ci_state == CIS_UNLOCKED);
556 LINVRNT(cl_io_invariant(io));
557
558 cl_io_for_each_reverse(scan, io) {
559 if (scan->cis_iop->op[io->ci_type].cio_iter_fini)
560 scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
561 }
562 io->ci_state = CIS_IT_ENDED;
563}
564EXPORT_SYMBOL(cl_io_iter_fini);
565
566
567
568
569static void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io,
570 size_t nob)
571{
572 const struct cl_io_slice *scan;
573
574 LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
575 nob == 0);
576 LINVRNT(cl_io_is_loopable(io));
577 LINVRNT(cl_io_invariant(io));
578
579 io->u.ci_rw.crw_pos += nob;
580 io->u.ci_rw.crw_count -= nob;
581
582
583 cl_io_for_each_reverse(scan, io) {
584 if (scan->cis_iop->op[io->ci_type].cio_advance)
585 scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
586 nob);
587 }
588}
589
590
591
592
593int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
594 struct cl_io_lock_link *link)
595{
596 int result;
597
598 if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
599 result = 1;
600 else {
601 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
602 result = 0;
603 }
604 return result;
605}
606EXPORT_SYMBOL(cl_io_lock_add);
607
608static void cl_free_io_lock_link(const struct lu_env *env,
609 struct cl_io_lock_link *link)
610{
611 kfree(link);
612}
613
614
615
616
617int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
618 struct cl_lock_descr *descr)
619{
620 struct cl_io_lock_link *link;
621 int result;
622
623 link = kzalloc(sizeof(*link), GFP_NOFS);
624 if (link) {
625 link->cill_descr = *descr;
626 link->cill_fini = cl_free_io_lock_link;
627 result = cl_io_lock_add(env, io, link);
628 if (result)
629 link->cill_fini(env, link);
630 } else
631 result = -ENOMEM;
632
633 return result;
634}
635EXPORT_SYMBOL(cl_io_lock_alloc_add);
636
637
638
639
640int cl_io_start(const struct lu_env *env, struct cl_io *io)
641{
642 const struct cl_io_slice *scan;
643 int result = 0;
644
645 LINVRNT(cl_io_is_loopable(io));
646 LINVRNT(io->ci_state == CIS_LOCKED);
647 LINVRNT(cl_io_invariant(io));
648
649 io->ci_state = CIS_IO_GOING;
650 cl_io_for_each(scan, io) {
651 if (!scan->cis_iop->op[io->ci_type].cio_start)
652 continue;
653 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
654 if (result != 0)
655 break;
656 }
657 if (result >= 0)
658 result = 0;
659 return result;
660}
661EXPORT_SYMBOL(cl_io_start);
662
663
664
665
666
667void cl_io_end(const struct lu_env *env, struct cl_io *io)
668{
669 const struct cl_io_slice *scan;
670
671 LINVRNT(cl_io_is_loopable(io));
672 LINVRNT(io->ci_state == CIS_IO_GOING);
673 LINVRNT(cl_io_invariant(io));
674
675 cl_io_for_each_reverse(scan, io) {
676 if (scan->cis_iop->op[io->ci_type].cio_end)
677 scan->cis_iop->op[io->ci_type].cio_end(env, scan);
678
679 }
680 io->ci_state = CIS_IO_FINISHED;
681}
682EXPORT_SYMBOL(cl_io_end);
683
684static const struct cl_page_slice *
685cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
686{
687 const struct cl_page_slice *slice;
688
689 slice = cl_page_at(page, ios->cis_obj->co_lu.lo_dev->ld_type);
690 LINVRNT(slice);
691 return slice;
692}
693
694
695
696
697static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
698{
699 int result = 1;
700 loff_t start;
701 loff_t end;
702 pgoff_t idx;
703
704 idx = page->cp_index;
705 switch (io->ci_type) {
706 case CIT_READ:
707 case CIT_WRITE:
708
709
710
711
712 if (!cl_io_is_append(io)) {
713 const struct cl_io_rw_common *crw = &(io->u.ci_rw);
714
715 start = cl_offset(page->cp_obj, idx);
716 end = cl_offset(page->cp_obj, idx + 1);
717 result = crw->crw_pos < end &&
718 start < crw->crw_pos + crw->crw_count;
719 }
720 break;
721 case CIT_FAULT:
722 result = io->u.ci_fault.ft_index == idx;
723 break;
724 default:
725 LBUG();
726 }
727 return result;
728}
729
730
731
732
733
734
735int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
736 struct cl_page *page)
737{
738 const struct cl_io_slice *scan;
739 struct cl_2queue *queue;
740 int result = 0;
741
742 LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
743 LINVRNT(cl_page_is_owned(page, io));
744 LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
745 LINVRNT(cl_page_in_io(page, io));
746 LINVRNT(cl_io_invariant(io));
747
748 queue = &io->ci_queue;
749
750 cl_2queue_init(queue);
751
752
753
754
755
756
757
758
759
760
761 cl_io_for_each(scan, io) {
762 if (scan->cis_iop->cio_read_page) {
763 const struct cl_page_slice *slice;
764
765 slice = cl_io_slice_page(scan, page);
766 LINVRNT(slice);
767 result = scan->cis_iop->cio_read_page(env, scan, slice);
768 if (result != 0)
769 break;
770 }
771 }
772 if (result == 0)
773 result = cl_io_submit_rw(env, io, CRT_READ, queue);
774
775
776
777 cl_page_list_disown(env, io, &queue->c2_qin);
778 cl_2queue_fini(env, queue);
779 return result;
780}
781EXPORT_SYMBOL(cl_io_read_page);
782
783
784
785
786
787
788int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
789 struct cl_page *page, unsigned from, unsigned to)
790{
791 const struct cl_io_slice *scan;
792 int result = 0;
793
794 LINVRNT(io->ci_type == CIT_WRITE);
795 LINVRNT(cl_page_is_owned(page, io));
796 LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
797 LINVRNT(cl_io_invariant(io));
798 LASSERT(cl_page_in_io(page, io));
799
800 cl_io_for_each_reverse(scan, io) {
801 if (scan->cis_iop->cio_prepare_write) {
802 const struct cl_page_slice *slice;
803
804 slice = cl_io_slice_page(scan, page);
805 result = scan->cis_iop->cio_prepare_write(env, scan,
806 slice,
807 from, to);
808 if (result != 0)
809 break;
810 }
811 }
812 return result;
813}
814EXPORT_SYMBOL(cl_io_prepare_write);
815
816
817
818
819
820
821int cl_io_commit_write(const struct lu_env *env, struct cl_io *io,
822 struct cl_page *page, unsigned from, unsigned to)
823{
824 const struct cl_io_slice *scan;
825 int result = 0;
826
827 LINVRNT(io->ci_type == CIT_WRITE);
828 LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
829 LINVRNT(cl_io_invariant(io));
830
831
832
833
834
835
836 LASSERT(cl_page_is_owned(page, io) || page->cp_parent);
837 LASSERT(cl_page_in_io(page, io));
838
839 cl_io_for_each(scan, io) {
840 if (scan->cis_iop->cio_commit_write) {
841 const struct cl_page_slice *slice;
842
843 slice = cl_io_slice_page(scan, page);
844 result = scan->cis_iop->cio_commit_write(env, scan,
845 slice,
846 from, to);
847 if (result != 0)
848 break;
849 }
850 }
851 LINVRNT(result <= 0);
852 return result;
853}
854EXPORT_SYMBOL(cl_io_commit_write);
855
856
857
858
859
860
861
862
863
864
865
866int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
867 enum cl_req_type crt, struct cl_2queue *queue)
868{
869 const struct cl_io_slice *scan;
870 int result = 0;
871
872 LINVRNT(crt < ARRAY_SIZE(scan->cis_iop->req_op));
873
874 cl_io_for_each(scan, io) {
875 if (!scan->cis_iop->req_op[crt].cio_submit)
876 continue;
877 result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
878 queue);
879 if (result != 0)
880 break;
881 }
882
883
884
885 LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
886 return result;
887}
888EXPORT_SYMBOL(cl_io_submit_rw);
889
890
891
892
893
894int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
895 enum cl_req_type iot, struct cl_2queue *queue,
896 long timeout)
897{
898 struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
899 struct cl_page *pg;
900 int rc;
901
902 cl_page_list_for_each(pg, &queue->c2_qin) {
903 LASSERT(!pg->cp_sync_io);
904 pg->cp_sync_io = anchor;
905 }
906
907 cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
908 rc = cl_io_submit_rw(env, io, iot, queue);
909 if (rc == 0) {
910
911
912
913
914
915
916 cl_page_list_for_each(pg, &queue->c2_qin) {
917 pg->cp_sync_io = NULL;
918 cl_sync_io_note(anchor, 1);
919 }
920
921
922 rc = cl_sync_io_wait(env, io, &queue->c2_qout,
923 anchor, timeout);
924 } else {
925 LASSERT(list_empty(&queue->c2_qout.pl_pages));
926 cl_page_list_for_each(pg, &queue->c2_qin)
927 pg->cp_sync_io = NULL;
928 }
929 return rc;
930}
931EXPORT_SYMBOL(cl_io_submit_sync);
932
933
934
935
936static int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
937 struct cl_page_list *queue)
938{
939 struct cl_page *page;
940 int result = 0;
941
942 CERROR("Canceling ongoing page transmission\n");
943 cl_page_list_for_each(page, queue) {
944 int rc;
945
946 LINVRNT(cl_page_in_io(page, io));
947 rc = cl_page_cancel(env, page);
948 result = result ?: rc;
949 }
950 return result;
951}
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972int cl_io_loop(const struct lu_env *env, struct cl_io *io)
973{
974 int result = 0;
975
976 LINVRNT(cl_io_is_loopable(io));
977
978 do {
979 size_t nob;
980
981 io->ci_continue = 0;
982 result = cl_io_iter_init(env, io);
983 if (result == 0) {
984 nob = io->ci_nob;
985 result = cl_io_lock(env, io);
986 if (result == 0) {
987
988
989
990
991
992
993
994 result = cl_io_start(env, io);
995
996
997
998
999
1000
1001 cl_io_end(env, io);
1002 cl_io_unlock(env, io);
1003 cl_io_rw_advance(env, io, io->ci_nob - nob);
1004 }
1005 }
1006 cl_io_iter_fini(env, io);
1007 } while (result == 0 && io->ci_continue);
1008 if (result == 0)
1009 result = io->ci_result;
1010 return result < 0 ? result : 0;
1011}
1012EXPORT_SYMBOL(cl_io_loop);
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
1024 struct cl_object *obj,
1025 const struct cl_io_operations *ops)
1026{
1027 struct list_head *linkage = &slice->cis_linkage;
1028
1029 LASSERT((!linkage->prev && !linkage->next) ||
1030 list_empty(linkage));
1031
1032 list_add_tail(linkage, &io->ci_layers);
1033 slice->cis_io = io;
1034 slice->cis_obj = obj;
1035 slice->cis_iop = ops;
1036}
1037EXPORT_SYMBOL(cl_io_slice_add);
1038
1039
1040
1041
1042void cl_page_list_init(struct cl_page_list *plist)
1043{
1044 plist->pl_nr = 0;
1045 INIT_LIST_HEAD(&plist->pl_pages);
1046 plist->pl_owner = current;
1047}
1048EXPORT_SYMBOL(cl_page_list_init);
1049
1050
1051
1052
1053void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
1054{
1055
1056
1057
1058 LASSERT(page->cp_owner);
1059 LINVRNT(plist->pl_owner == current);
1060
1061 lockdep_off();
1062 mutex_lock(&page->cp_mutex);
1063 lockdep_on();
1064 LASSERT(list_empty(&page->cp_batch));
1065 list_add_tail(&page->cp_batch, &plist->pl_pages);
1066 ++plist->pl_nr;
1067 lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1068 cl_page_get(page);
1069}
1070EXPORT_SYMBOL(cl_page_list_add);
1071
1072
1073
1074
1075static void cl_page_list_del(const struct lu_env *env,
1076 struct cl_page_list *plist, struct cl_page *page)
1077{
1078 LASSERT(plist->pl_nr > 0);
1079 LINVRNT(plist->pl_owner == current);
1080
1081 list_del_init(&page->cp_batch);
1082 lockdep_off();
1083 mutex_unlock(&page->cp_mutex);
1084 lockdep_on();
1085 --plist->pl_nr;
1086 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1087 cl_page_put(env, page);
1088}
1089
1090
1091
1092
1093void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1094 struct cl_page *page)
1095{
1096 LASSERT(src->pl_nr > 0);
1097 LINVRNT(dst->pl_owner == current);
1098 LINVRNT(src->pl_owner == current);
1099
1100 list_move_tail(&page->cp_batch, &dst->pl_pages);
1101 --src->pl_nr;
1102 ++dst->pl_nr;
1103 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1104 src, dst);
1105}
1106EXPORT_SYMBOL(cl_page_list_move);
1107
1108
1109
1110
1111void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1112{
1113 struct cl_page *page;
1114 struct cl_page *tmp;
1115
1116 LINVRNT(list->pl_owner == current);
1117 LINVRNT(head->pl_owner == current);
1118
1119 cl_page_list_for_each_safe(page, tmp, list)
1120 cl_page_list_move(head, list, page);
1121}
1122EXPORT_SYMBOL(cl_page_list_splice);
1123
1124void cl_page_disown0(const struct lu_env *env,
1125 struct cl_io *io, struct cl_page *pg);
1126
1127
1128
1129
1130void cl_page_list_disown(const struct lu_env *env,
1131 struct cl_io *io, struct cl_page_list *plist)
1132{
1133 struct cl_page *page;
1134 struct cl_page *temp;
1135
1136 LINVRNT(plist->pl_owner == current);
1137
1138 cl_page_list_for_each_safe(page, temp, plist) {
1139 LASSERT(plist->pl_nr > 0);
1140
1141 list_del_init(&page->cp_batch);
1142 lockdep_off();
1143 mutex_unlock(&page->cp_mutex);
1144 lockdep_on();
1145 --plist->pl_nr;
1146
1147
1148
1149
1150
1151
1152
1153
1154 cl_page_disown0(env, io, page);
1155 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1156 plist);
1157 cl_page_put(env, page);
1158 }
1159}
1160EXPORT_SYMBOL(cl_page_list_disown);
1161
1162
1163
1164
1165static void cl_page_list_fini(const struct lu_env *env,
1166 struct cl_page_list *plist)
1167{
1168 struct cl_page *page;
1169 struct cl_page *temp;
1170
1171 LINVRNT(plist->pl_owner == current);
1172
1173 cl_page_list_for_each_safe(page, temp, plist)
1174 cl_page_list_del(env, plist, page);
1175 LASSERT(plist->pl_nr == 0);
1176}
1177
1178
1179
1180
1181static void cl_page_list_assume(const struct lu_env *env,
1182 struct cl_io *io, struct cl_page_list *plist)
1183{
1184 struct cl_page *page;
1185
1186 LINVRNT(plist->pl_owner == current);
1187
1188 cl_page_list_for_each(page, plist)
1189 cl_page_assume(env, io, page);
1190}
1191
1192
1193
1194
1195static void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1196 struct cl_page_list *plist)
1197{
1198 struct cl_page *page;
1199
1200 LINVRNT(plist->pl_owner == current);
1201 cl_page_list_for_each(page, plist)
1202 cl_page_discard(env, io, page);
1203}
1204
1205
1206
1207
1208void cl_2queue_init(struct cl_2queue *queue)
1209{
1210 cl_page_list_init(&queue->c2_qin);
1211 cl_page_list_init(&queue->c2_qout);
1212}
1213EXPORT_SYMBOL(cl_2queue_init);
1214
1215
1216
1217
1218void cl_2queue_disown(const struct lu_env *env,
1219 struct cl_io *io, struct cl_2queue *queue)
1220{
1221 cl_page_list_disown(env, io, &queue->c2_qin);
1222 cl_page_list_disown(env, io, &queue->c2_qout);
1223}
1224EXPORT_SYMBOL(cl_2queue_disown);
1225
1226
1227
1228
1229void cl_2queue_discard(const struct lu_env *env,
1230 struct cl_io *io, struct cl_2queue *queue)
1231{
1232 cl_page_list_discard(env, io, &queue->c2_qin);
1233 cl_page_list_discard(env, io, &queue->c2_qout);
1234}
1235EXPORT_SYMBOL(cl_2queue_discard);
1236
1237
1238
1239
1240void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1241{
1242 cl_page_list_fini(env, &queue->c2_qout);
1243 cl_page_list_fini(env, &queue->c2_qin);
1244}
1245EXPORT_SYMBOL(cl_2queue_fini);
1246
1247
1248
1249
1250void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1251{
1252 cl_2queue_init(queue);
1253
1254
1255
1256 cl_page_list_add(&queue->c2_qin, page);
1257}
1258EXPORT_SYMBOL(cl_2queue_init_page);
1259
1260
1261
1262
1263
1264
1265struct cl_io *cl_io_top(struct cl_io *io)
1266{
1267 while (io->ci_parent)
1268 io = io->ci_parent;
1269 return io;
1270}
1271EXPORT_SYMBOL(cl_io_top);
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
1283 struct cl_device *dev,
1284 const struct cl_req_operations *ops)
1285{
1286 list_add_tail(&slice->crs_linkage, &req->crq_layers);
1287 slice->crs_dev = dev;
1288 slice->crs_ops = ops;
1289 slice->crs_req = req;
1290}
1291EXPORT_SYMBOL(cl_req_slice_add);
1292
1293static void cl_req_free(const struct lu_env *env, struct cl_req *req)
1294{
1295 unsigned i;
1296
1297 LASSERT(list_empty(&req->crq_pages));
1298 LASSERT(req->crq_nrpages == 0);
1299 LINVRNT(list_empty(&req->crq_layers));
1300 LINVRNT(equi(req->crq_nrobjs > 0, req->crq_o));
1301
1302 if (req->crq_o) {
1303 for (i = 0; i < req->crq_nrobjs; ++i) {
1304 struct cl_object *obj = req->crq_o[i].ro_obj;
1305
1306 if (obj) {
1307 lu_object_ref_del_at(&obj->co_lu,
1308 &req->crq_o[i].ro_obj_ref,
1309 "cl_req", req);
1310 cl_object_put(env, obj);
1311 }
1312 }
1313 kfree(req->crq_o);
1314 }
1315 kfree(req);
1316}
1317
1318static int cl_req_init(const struct lu_env *env, struct cl_req *req,
1319 struct cl_page *page)
1320{
1321 struct cl_device *dev;
1322 struct cl_page_slice *slice;
1323 int result;
1324
1325 result = 0;
1326 page = cl_page_top(page);
1327 do {
1328 list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1329 dev = lu2cl_dev(slice->cpl_obj->co_lu.lo_dev);
1330 if (dev->cd_ops->cdo_req_init) {
1331 result = dev->cd_ops->cdo_req_init(env,
1332 dev, req);
1333 if (result != 0)
1334 break;
1335 }
1336 }
1337 page = page->cp_child;
1338 } while (page && result == 0);
1339 return result;
1340}
1341
1342
1343
1344
1345
1346void cl_req_completion(const struct lu_env *env, struct cl_req *req, int rc)
1347{
1348 struct cl_req_slice *slice;
1349
1350
1351
1352
1353 while (!list_empty(&req->crq_layers)) {
1354 slice = list_entry(req->crq_layers.prev,
1355 struct cl_req_slice, crs_linkage);
1356 list_del_init(&slice->crs_linkage);
1357 if (slice->crs_ops->cro_completion)
1358 slice->crs_ops->cro_completion(env, slice, rc);
1359 }
1360 cl_req_free(env, req);
1361}
1362EXPORT_SYMBOL(cl_req_completion);
1363
1364
1365
1366
1367struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
1368 enum cl_req_type crt, int nr_objects)
1369{
1370 struct cl_req *req;
1371
1372 LINVRNT(nr_objects > 0);
1373
1374 req = kzalloc(sizeof(*req), GFP_NOFS);
1375 if (req) {
1376 int result;
1377
1378 req->crq_type = crt;
1379 INIT_LIST_HEAD(&req->crq_pages);
1380 INIT_LIST_HEAD(&req->crq_layers);
1381
1382 req->crq_o = kcalloc(nr_objects, sizeof(req->crq_o[0]),
1383 GFP_NOFS);
1384 if (req->crq_o) {
1385 req->crq_nrobjs = nr_objects;
1386 result = cl_req_init(env, req, page);
1387 } else
1388 result = -ENOMEM;
1389 if (result != 0) {
1390 cl_req_completion(env, req, result);
1391 req = ERR_PTR(result);
1392 }
1393 } else
1394 req = ERR_PTR(-ENOMEM);
1395 return req;
1396}
1397EXPORT_SYMBOL(cl_req_alloc);
1398
1399
1400
1401
1402void cl_req_page_add(const struct lu_env *env,
1403 struct cl_req *req, struct cl_page *page)
1404{
1405 struct cl_object *obj;
1406 struct cl_req_obj *rqo;
1407 int i;
1408
1409 page = cl_page_top(page);
1410
1411 LASSERT(list_empty(&page->cp_flight));
1412 LASSERT(!page->cp_req);
1413
1414 CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
1415 req, req->crq_type, req->crq_nrpages);
1416
1417 list_add_tail(&page->cp_flight, &req->crq_pages);
1418 ++req->crq_nrpages;
1419 page->cp_req = req;
1420 obj = cl_object_top(page->cp_obj);
1421 for (i = 0, rqo = req->crq_o; obj != rqo->ro_obj; ++i, ++rqo) {
1422 if (!rqo->ro_obj) {
1423 rqo->ro_obj = obj;
1424 cl_object_get(obj);
1425 lu_object_ref_add_at(&obj->co_lu, &rqo->ro_obj_ref,
1426 "cl_req", req);
1427 break;
1428 }
1429 }
1430 LASSERT(i < req->crq_nrobjs);
1431}
1432EXPORT_SYMBOL(cl_req_page_add);
1433
1434
1435
1436
1437void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
1438{
1439 struct cl_req *req = page->cp_req;
1440
1441 page = cl_page_top(page);
1442
1443 LASSERT(!list_empty(&page->cp_flight));
1444 LASSERT(req->crq_nrpages > 0);
1445
1446 list_del_init(&page->cp_flight);
1447 --req->crq_nrpages;
1448 page->cp_req = NULL;
1449}
1450EXPORT_SYMBOL(cl_req_page_done);
1451
1452
1453
1454
1455
1456int cl_req_prep(const struct lu_env *env, struct cl_req *req)
1457{
1458 int i;
1459 int result;
1460 const struct cl_req_slice *slice;
1461
1462
1463
1464
1465
1466 for (i = 0; i < req->crq_nrobjs; ++i)
1467 LASSERT(req->crq_o[i].ro_obj);
1468
1469 result = 0;
1470 list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1471 if (slice->crs_ops->cro_prep) {
1472 result = slice->crs_ops->cro_prep(env, slice);
1473 if (result != 0)
1474 break;
1475 }
1476 }
1477 return result;
1478}
1479EXPORT_SYMBOL(cl_req_prep);
1480
1481
1482
1483
1484
1485
1486void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
1487 struct cl_req_attr *attr, u64 flags)
1488{
1489 const struct cl_req_slice *slice;
1490 struct cl_page *page;
1491 int i;
1492
1493 LASSERT(!list_empty(&req->crq_pages));
1494
1495
1496 page = list_entry(req->crq_pages.next, struct cl_page, cp_flight);
1497
1498 for (i = 0; i < req->crq_nrobjs; ++i) {
1499 list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1500 const struct cl_page_slice *scan;
1501 const struct cl_object *obj;
1502
1503 scan = cl_page_at(page,
1504 slice->crs_dev->cd_lu_dev.ld_type);
1505 obj = scan->cpl_obj;
1506 if (slice->crs_ops->cro_attr_set)
1507 slice->crs_ops->cro_attr_set(env, slice, obj,
1508 attr + i, flags);
1509 }
1510 }
1511}
1512EXPORT_SYMBOL(cl_req_attr_set);
1513
1514
1515
1516
1517
1518void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
1519{
1520 init_waitqueue_head(&anchor->csi_waitq);
1521 atomic_set(&anchor->csi_sync_nr, nrpages);
1522 atomic_set(&anchor->csi_barrier, nrpages > 0);
1523 anchor->csi_sync_rc = 0;
1524}
1525EXPORT_SYMBOL(cl_sync_io_init);
1526
1527
1528
1529
1530
1531int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
1532 struct cl_page_list *queue, struct cl_sync_io *anchor,
1533 long timeout)
1534{
1535 struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1536 NULL, NULL, NULL);
1537 int rc;
1538
1539 LASSERT(timeout >= 0);
1540
1541 rc = l_wait_event(anchor->csi_waitq,
1542 atomic_read(&anchor->csi_sync_nr) == 0,
1543 &lwi);
1544 if (rc < 0) {
1545 CERROR("SYNC IO failed with error: %d, try to cancel %d remaining pages\n",
1546 rc, atomic_read(&anchor->csi_sync_nr));
1547
1548 (void)cl_io_cancel(env, io, queue);
1549
1550 lwi = (struct l_wait_info) { 0 };
1551 (void)l_wait_event(anchor->csi_waitq,
1552 atomic_read(&anchor->csi_sync_nr) == 0,
1553 &lwi);
1554 } else {
1555 rc = anchor->csi_sync_rc;
1556 }
1557 LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1558 cl_page_list_assume(env, io, queue);
1559
1560
1561 while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1562 cpu_relax();
1563 }
1564
1565 POISON(anchor, 0x5a, sizeof(*anchor));
1566 return rc;
1567}
1568EXPORT_SYMBOL(cl_sync_io_wait);
1569
1570
1571
1572
1573void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
1574{
1575 if (anchor->csi_sync_rc == 0 && ioret < 0)
1576 anchor->csi_sync_rc = ioret;
1577
1578
1579
1580
1581
1582 LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1583 if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1584 wake_up_all(&anchor->csi_waitq);
1585
1586 atomic_set(&anchor->csi_barrier, 0);
1587 }
1588}
1589EXPORT_SYMBOL(cl_sync_io_note);
1590