1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/backing-dev.h>
4#include <linux/fs.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/writeback.h>
8#include <linux/slab.h>
9#include <linux/pagevec.h>
10#include <linux/task_io_accounting_ops.h>
11
12#include "super.h"
13#include "mds_client.h"
14#include <linux/ceph/osd_client.h>
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
53#define CONGESTION_OFF_THRESH(congestion_kb) \
54 (CONGESTION_ON_THRESH(congestion_kb) - \
55 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
56
57static inline struct ceph_snap_context *page_snap_context(struct page *page)
58{
59 if (PagePrivate(page))
60 return (void *)page->private;
61 return NULL;
62}
63
64
65
66
67
68static int ceph_set_page_dirty(struct page *page)
69{
70 struct address_space *mapping = page->mapping;
71 struct inode *inode;
72 struct ceph_inode_info *ci;
73 struct ceph_snap_context *snapc;
74 int ret;
75
76 if (unlikely(!mapping))
77 return !TestSetPageDirty(page);
78
79 if (PageDirty(page)) {
80 dout("%p set_page_dirty %p idx %lu -- already dirty\n",
81 mapping->host, page, page->index);
82 BUG_ON(!PagePrivate(page));
83 return 0;
84 }
85
86 inode = mapping->host;
87 ci = ceph_inode(inode);
88
89
90 spin_lock(&ci->i_ceph_lock);
91 BUG_ON(ci->i_wr_ref == 0);
92 if (__ceph_have_pending_cap_snap(ci)) {
93 struct ceph_cap_snap *capsnap =
94 list_last_entry(&ci->i_cap_snaps,
95 struct ceph_cap_snap,
96 ci_item);
97 snapc = ceph_get_snap_context(capsnap->context);
98 capsnap->dirty_pages++;
99 } else {
100 BUG_ON(!ci->i_head_snapc);
101 snapc = ceph_get_snap_context(ci->i_head_snapc);
102 ++ci->i_wrbuffer_ref_head;
103 }
104 if (ci->i_wrbuffer_ref == 0)
105 ihold(inode);
106 ++ci->i_wrbuffer_ref;
107 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
108 "snapc %p seq %lld (%d snaps)\n",
109 mapping->host, page, page->index,
110 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
111 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
112 snapc, snapc->seq, snapc->num_snaps);
113 spin_unlock(&ci->i_ceph_lock);
114
115
116
117
118
119 BUG_ON(PagePrivate(page));
120 page->private = (unsigned long)snapc;
121 SetPagePrivate(page);
122
123 ret = __set_page_dirty_nobuffers(page);
124 WARN_ON(!PageLocked(page));
125 WARN_ON(!page->mapping);
126
127 return ret;
128}
129
130
131
132
133
134
135static void ceph_invalidatepage(struct page *page, unsigned long offset)
136{
137 struct inode *inode;
138 struct ceph_inode_info *ci;
139 struct ceph_snap_context *snapc = page_snap_context(page);
140
141 inode = page->mapping->host;
142 ci = ceph_inode(inode);
143
144 if (offset != 0) {
145 dout("%p invalidatepage %p idx %lu partial dirty page %lu\n",
146 inode, page, page->index, offset);
147 return;
148 }
149
150 WARN_ON(!PageLocked(page));
151 if (!PagePrivate(page))
152 return;
153
154 ClearPageChecked(page);
155
156 dout("%p invalidatepage %p idx %lu full dirty page\n",
157 inode, page, page->index);
158
159 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
160 ceph_put_snap_context(snapc);
161 page->private = 0;
162 ClearPagePrivate(page);
163}
164
165
166static int ceph_releasepage(struct page *page, gfp_t g)
167{
168 dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
169 page, page->index, PageDirty(page) ? "" : "not ");
170
171 return !PagePrivate(page);
172}
173
174
175
176
177static int readpage_nounlock(struct file *filp, struct page *page)
178{
179 struct inode *inode = file_inode(filp);
180 struct ceph_inode_info *ci = ceph_inode(inode);
181 struct ceph_osd_client *osdc =
182 &ceph_inode_to_client(inode)->client->osdc;
183 int err = 0;
184 u64 off = page_offset(page);
185 u64 len = PAGE_CACHE_SIZE;
186
187 if (off >= i_size_read(inode)) {
188 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
189 SetPageUptodate(page);
190 return 0;
191 }
192
193 if (ci->i_inline_version != CEPH_INLINE_NONE) {
194
195
196
197
198 if (off == 0)
199 return -EINVAL;
200 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
201 SetPageUptodate(page);
202 return 0;
203 }
204
205 dout("readpage inode %p file %p page %p index %lu\n",
206 inode, filp, page, page->index);
207 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
208 off, &len,
209 ci->i_truncate_seq, ci->i_truncate_size,
210 &page, 1, 0);
211 if (err == -ENOENT)
212 err = 0;
213 if (err < 0) {
214 SetPageError(page);
215 goto out;
216 }
217
218 if (err < PAGE_CACHE_SIZE)
219
220 zero_user_segment(page, err, PAGE_CACHE_SIZE);
221 else
222 flush_dcache_page(page);
223
224 SetPageUptodate(page);
225out:
226 return err < 0 ? err : 0;
227}
228
229static int ceph_readpage(struct file *filp, struct page *page)
230{
231 int r = readpage_nounlock(filp, page);
232 unlock_page(page);
233 return r;
234}
235
236
237
238
239static void finish_read(struct ceph_osd_request *req)
240{
241 struct inode *inode = req->r_inode;
242 struct ceph_osd_data *osd_data;
243 int rc = req->r_result <= 0 ? req->r_result : 0;
244 int bytes = req->r_result >= 0 ? req->r_result : 0;
245 int num_pages;
246 int i;
247
248 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
249
250
251 osd_data = osd_req_op_extent_osd_data(req, 0);
252 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
253 num_pages = calc_pages_for((u64)osd_data->alignment,
254 (u64)osd_data->length);
255 for (i = 0; i < num_pages; i++) {
256 struct page *page = osd_data->pages[i];
257
258 if (rc < 0 && rc != -ENOENT)
259 goto unlock;
260 if (bytes < (int)PAGE_CACHE_SIZE) {
261
262 int s = bytes < 0 ? 0 : bytes;
263 zero_user_segment(page, s, PAGE_CACHE_SIZE);
264 }
265 dout("finish_read %p uptodate %p idx %lu\n", inode, page,
266 page->index);
267 flush_dcache_page(page);
268 SetPageUptodate(page);
269unlock:
270 unlock_page(page);
271 page_cache_release(page);
272 bytes -= PAGE_CACHE_SIZE;
273 }
274 kfree(osd_data->pages);
275}
276
277
278
279
280
281static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
282 struct list_head *page_list, int max)
283{
284 struct ceph_osd_client *osdc =
285 &ceph_inode_to_client(inode)->client->osdc;
286 struct ceph_inode_info *ci = ceph_inode(inode);
287 struct page *page = list_entry(page_list->prev, struct page, lru);
288 struct ceph_vino vino;
289 struct ceph_osd_request *req;
290 u64 off;
291 u64 len;
292 int i;
293 struct page **pages;
294 pgoff_t next_index;
295 int nr_pages = 0;
296 int got = 0;
297 int ret = 0;
298
299 if (!rw_ctx) {
300
301
302 int want = CEPH_CAP_FILE_CACHE;
303 ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
304 if (ret < 0) {
305 dout("start_read %p, error getting cap\n", inode);
306 } else if (!(got & want)) {
307 dout("start_read %p, no cache cap\n", inode);
308 ret = 0;
309 }
310 if (ret <= 0) {
311 if (got)
312 ceph_put_cap_refs(ci, got);
313 while (!list_empty(page_list)) {
314 page = list_entry(page_list->prev,
315 struct page, lru);
316 list_del(&page->lru);
317 put_page(page);
318 }
319 return ret;
320 }
321 }
322
323 off = (u64) page_offset(page);
324
325
326 next_index = page->index;
327 list_for_each_entry_reverse(page, page_list, lru) {
328 if (page->index != next_index)
329 break;
330 nr_pages++;
331 next_index++;
332 if (max && nr_pages == max)
333 break;
334 }
335 len = nr_pages << PAGE_CACHE_SHIFT;
336 dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
337 off, len);
338 vino = ceph_vino(inode);
339 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
340 0, 1, CEPH_OSD_OP_READ,
341 CEPH_OSD_FLAG_READ, NULL,
342 ci->i_truncate_seq, ci->i_truncate_size,
343 false);
344 if (IS_ERR(req)) {
345 ret = PTR_ERR(req);
346 goto out;
347 }
348
349
350 nr_pages = calc_pages_for(0, len);
351 pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
352 if (!pages) {
353 ret = -ENOMEM;
354 goto out_put;
355 }
356 for (i = 0; i < nr_pages; ++i) {
357 page = list_entry(page_list->prev, struct page, lru);
358 BUG_ON(PageLocked(page));
359 list_del(&page->lru);
360
361 dout("start_read %p adding %p idx %lu\n", inode, page,
362 page->index);
363 if (add_to_page_cache_lru(page, &inode->i_data, page->index,
364 GFP_KERNEL)) {
365 page_cache_release(page);
366 dout("start_read %p add_to_page_cache failed %p\n",
367 inode, page);
368 nr_pages = i;
369 if (nr_pages > 0) {
370 len = nr_pages << PAGE_SHIFT;
371 osd_req_op_extent_update(req, 0, len);
372 break;
373 }
374 goto out_pages;
375 }
376 pages[i] = page;
377 }
378 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
379 req->r_callback = finish_read;
380 req->r_inode = inode;
381
382 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
383 ret = ceph_osdc_start_request(osdc, req, false);
384 if (ret < 0)
385 goto out_pages;
386 ceph_osdc_put_request(req);
387
388
389
390 if (got)
391 ceph_put_cap_refs(ci, got);
392
393 return nr_pages;
394
395out_pages:
396 for (i = 0; i < nr_pages; ++i)
397 unlock_page(pages[i]);
398 ceph_put_page_vector(pages, nr_pages, false);
399out_put:
400 ceph_osdc_put_request(req);
401out:
402 if (got)
403 ceph_put_cap_refs(ci, got);
404 return ret;
405}
406
407
408
409
410
411
412static int ceph_readpages(struct file *file, struct address_space *mapping,
413 struct list_head *page_list, unsigned nr_pages)
414{
415 struct inode *inode = file_inode(file);
416 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
417 struct ceph_file_info *fi = file->private_data;
418 struct ceph_rw_context *rw_ctx;
419 int rc = 0;
420 int max = 0;
421
422 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
423 return -EINVAL;
424
425 rw_ctx = ceph_find_rw_context(fi);
426 max = fsc->mount_options->rsize >> PAGE_SHIFT;
427 dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
428 inode, file, rw_ctx, nr_pages, max);
429 while (!list_empty(page_list)) {
430 rc = start_read(inode, rw_ctx, page_list, max);
431 if (rc < 0)
432 goto out;
433 }
434out:
435 dout("readpages %p file %p ret %d\n", inode, file, rc);
436 return rc;
437}
438
439struct ceph_writeback_ctl
440{
441 loff_t i_size;
442 u64 truncate_size;
443 u32 truncate_seq;
444 bool size_stable;
445 bool head_snapc;
446};
447
448
449
450
451
452static struct ceph_snap_context *
453get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
454 struct ceph_snap_context *page_snapc)
455{
456 struct ceph_inode_info *ci = ceph_inode(inode);
457 struct ceph_snap_context *snapc = NULL;
458 struct ceph_cap_snap *capsnap = NULL;
459
460 spin_lock(&ci->i_ceph_lock);
461 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
462 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
463 capsnap->context, capsnap->dirty_pages);
464 if (!capsnap->dirty_pages)
465 continue;
466
467
468 if (snapc && capsnap->context != page_snapc)
469 continue;
470
471 if (ctl) {
472 if (capsnap->writing) {
473 ctl->i_size = i_size_read(inode);
474 ctl->size_stable = false;
475 } else {
476 ctl->i_size = capsnap->size;
477 ctl->size_stable = true;
478 }
479 ctl->truncate_size = capsnap->truncate_size;
480 ctl->truncate_seq = capsnap->truncate_seq;
481 ctl->head_snapc = false;
482 }
483
484 if (snapc)
485 break;
486
487 snapc = ceph_get_snap_context(capsnap->context);
488 if (!page_snapc ||
489 page_snapc == snapc ||
490 page_snapc->seq > snapc->seq)
491 break;
492 }
493 if (!snapc && ci->i_wrbuffer_ref_head) {
494 snapc = ceph_get_snap_context(ci->i_head_snapc);
495 dout(" head snapc %p has %d dirty pages\n",
496 snapc, ci->i_wrbuffer_ref_head);
497 if (ctl) {
498 ctl->i_size = i_size_read(inode);
499 ctl->truncate_size = ci->i_truncate_size;
500 ctl->truncate_seq = ci->i_truncate_seq;
501 ctl->size_stable = false;
502 ctl->head_snapc = true;
503 }
504 }
505 spin_unlock(&ci->i_ceph_lock);
506 return snapc;
507}
508
509static u64 get_writepages_data_length(struct inode *inode,
510 struct page *page, u64 start)
511{
512 struct ceph_inode_info *ci = ceph_inode(inode);
513 struct ceph_snap_context *snapc = page_snap_context(page);
514 struct ceph_cap_snap *capsnap = NULL;
515 u64 end = i_size_read(inode);
516
517 if (snapc != ci->i_head_snapc) {
518 bool found = false;
519 spin_lock(&ci->i_ceph_lock);
520 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
521 if (capsnap->context == snapc) {
522 if (!capsnap->writing)
523 end = capsnap->size;
524 found = true;
525 break;
526 }
527 }
528 spin_unlock(&ci->i_ceph_lock);
529 WARN_ON(!found);
530 }
531 if (end > page_offset(page) + PAGE_SIZE)
532 end = page_offset(page) + PAGE_SIZE;
533 return end > start ? end - start : 0;
534}
535
536
537
538
539
540
541
542static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
543{
544 struct inode *inode;
545 struct ceph_inode_info *ci;
546 struct ceph_fs_client *fsc;
547 struct ceph_snap_context *snapc, *oldest;
548 loff_t page_off = page_offset(page);
549 int err, len = PAGE_CACHE_SIZE;
550 struct ceph_writeback_ctl ceph_wbc;
551
552 dout("writepage %p idx %lu\n", page, page->index);
553
554 inode = page->mapping->host;
555 ci = ceph_inode(inode);
556 fsc = ceph_inode_to_client(inode);
557
558
559 snapc = page_snap_context(page);
560 if (!snapc) {
561 dout("writepage %p page %p not dirty?\n", inode, page);
562 return 0;
563 }
564 oldest = get_oldest_context(inode, &ceph_wbc, snapc);
565 if (snapc->seq > oldest->seq) {
566 dout("writepage %p page %p snapc %p not writeable - noop\n",
567 inode, page, snapc);
568
569 WARN_ON(!(current->flags & PF_MEMALLOC));
570 ceph_put_snap_context(oldest);
571 redirty_page_for_writepage(wbc, page);
572 return 0;
573 }
574 ceph_put_snap_context(oldest);
575
576
577 if (page_off >= ceph_wbc.i_size) {
578 dout("%p page eof %llu\n", page, ceph_wbc.i_size);
579 page->mapping->a_ops->invalidatepage(page, 0);
580 return 0;
581 }
582
583 if (ceph_wbc.i_size < page_off + len)
584 len = ceph_wbc.i_size - page_off;
585
586 dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
587 inode, page, page->index, page_off, len, snapc, snapc->seq);
588
589 if (atomic_long_inc_return(&fsc->writeback_count) >
590 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
591 set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
592
593 set_page_writeback(page);
594 err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
595 &ci->i_layout, snapc, page_off, len,
596 ceph_wbc.truncate_seq,
597 ceph_wbc.truncate_size,
598 &inode->i_mtime, &page, 1);
599 if (err < 0) {
600 struct writeback_control tmp_wbc;
601 if (!wbc)
602 wbc = &tmp_wbc;
603 if (err == -ERESTARTSYS) {
604
605 dout("writepage interrupted page %p\n", page);
606 redirty_page_for_writepage(wbc, page);
607 end_page_writeback(page);
608 return err;
609 }
610 dout("writepage setting page/mapping error %d %p\n",
611 err, page);
612 SetPageError(page);
613 mapping_set_error(&inode->i_data, err);
614 wbc->pages_skipped++;
615 } else {
616 dout("writepage cleaned page %p\n", page);
617 err = 0;
618 }
619 page->private = 0;
620 ClearPagePrivate(page);
621 end_page_writeback(page);
622 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
623 ceph_put_snap_context(snapc);
624
625 if (atomic_long_dec_return(&fsc->writeback_count) <
626 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
627 clear_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
628
629 return err;
630}
631
632static int ceph_writepage(struct page *page, struct writeback_control *wbc)
633{
634 int err;
635 struct inode *inode = page->mapping->host;
636 BUG_ON(!inode);
637 ihold(inode);
638 err = writepage_nounlock(page, wbc);
639 if (err == -ERESTARTSYS) {
640
641
642 err = 0;
643 }
644 unlock_page(page);
645 iput(inode);
646 return err;
647}
648
649
650
651
652
653static void ceph_release_pages(struct page **pages, int num)
654{
655 struct pagevec pvec;
656 int i;
657
658 pagevec_init(&pvec, 0);
659 for (i = 0; i < num; i++) {
660 if (pagevec_add(&pvec, pages[i]) == 0)
661 pagevec_release(&pvec);
662 }
663 pagevec_release(&pvec);
664}
665
666
667
668
669
670
671
672
673static void writepages_finish(struct ceph_osd_request *req)
674{
675 struct inode *inode = req->r_inode;
676 struct ceph_inode_info *ci = ceph_inode(inode);
677 struct ceph_osd_data *osd_data;
678 struct page *page;
679 int num_pages, total_pages = 0;
680 int i, j;
681 int rc = req->r_result;
682 struct ceph_snap_context *snapc = req->r_snapc;
683 struct address_space *mapping = inode->i_mapping;
684 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
685 bool remove_page;
686
687
688 dout("writepages_finish %p rc %d\n", inode, rc);
689 if (rc < 0) {
690 mapping_set_error(mapping, rc);
691 ceph_set_error_write(ci);
692 } else {
693 ceph_clear_error_write(ci);
694 }
695
696
697
698
699
700
701
702 remove_page = !(ceph_caps_issued(ci) &
703 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
704
705
706 for (i = 0; i < req->r_num_ops; i++) {
707 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
708 break;
709
710 osd_data = osd_req_op_extent_osd_data(req, i);
711 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
712 num_pages = calc_pages_for((u64)osd_data->alignment,
713 (u64)osd_data->length);
714 total_pages += num_pages;
715 for (j = 0; j < num_pages; j++) {
716 page = osd_data->pages[j];
717 BUG_ON(!page);
718 WARN_ON(!PageUptodate(page));
719
720 if (atomic_long_dec_return(&fsc->writeback_count) <
721 CONGESTION_OFF_THRESH(
722 fsc->mount_options->congestion_kb))
723 clear_bdi_congested(&fsc->backing_dev_info,
724 BLK_RW_ASYNC);
725
726 ceph_put_snap_context(page_snap_context(page));
727 page->private = 0;
728 ClearPagePrivate(page);
729 dout("unlocking %p\n", page);
730 end_page_writeback(page);
731
732 if (remove_page)
733 generic_error_remove_page(inode->i_mapping,
734 page);
735
736 unlock_page(page);
737 }
738 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
739 inode, osd_data->length, rc >= 0 ? num_pages : 0);
740
741 ceph_release_pages(osd_data->pages, num_pages);
742 }
743
744 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
745
746 osd_data = osd_req_op_extent_osd_data(req, 0);
747 if (osd_data->pages_from_pool)
748 mempool_free(osd_data->pages,
749 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
750 else
751 kfree(osd_data->pages);
752 ceph_osdc_put_request(req);
753}
754
755
756
757
758static int ceph_writepages_start(struct address_space *mapping,
759 struct writeback_control *wbc)
760{
761 struct inode *inode = mapping->host;
762 struct ceph_inode_info *ci = ceph_inode(inode);
763 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
764 struct ceph_vino vino = ceph_vino(inode);
765 pgoff_t index, start_index, end = -1;
766 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
767 struct pagevec pvec;
768 int rc = 0;
769 unsigned wsize = 1 << inode->i_blkbits;
770 struct ceph_osd_request *req = NULL;
771 struct ceph_writeback_ctl ceph_wbc;
772 bool should_loop, range_whole = false;
773 bool done = false;
774
775 dout("writepages_start %p (mode=%s)\n", inode,
776 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
777 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
778
779 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
780 if (ci->i_wrbuffer_ref > 0) {
781 pr_warn_ratelimited(
782 "writepage_start %p %lld forced umount\n",
783 inode, ceph_ino(inode));
784 }
785 mapping_set_error(mapping, -EIO);
786 return -EIO;
787 }
788 if (fsc->mount_options->wsize < wsize)
789 wsize = fsc->mount_options->wsize;
790
791 pagevec_init(&pvec, 0);
792
793 start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
794 index = start_index;
795
796retry:
797
798 snapc = get_oldest_context(inode, &ceph_wbc, NULL);
799 if (!snapc) {
800
801
802 dout(" no snap context with dirty data?\n");
803 goto out;
804 }
805 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
806 snapc, snapc->seq, snapc->num_snaps);
807
808 should_loop = false;
809 if (ceph_wbc.head_snapc && snapc != last_snapc) {
810
811 if (wbc->range_cyclic) {
812 index = start_index;
813 end = -1;
814 if (index > 0)
815 should_loop = true;
816 dout(" cyclic, start at %lu\n", index);
817 } else {
818 index = wbc->range_start >> PAGE_SHIFT;
819 end = wbc->range_end >> PAGE_SHIFT;
820 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
821 range_whole = true;
822 dout(" not cyclic, %lu to %lu\n", index, end);
823 }
824 } else if (!ceph_wbc.head_snapc) {
825
826
827
828
829 if (index > 0)
830 should_loop = true;
831 dout(" non-head snapc, range whole\n");
832 }
833
834 ceph_put_snap_context(last_snapc);
835 last_snapc = snapc;
836
837 while (!done && index <= end) {
838 int num_ops = 0, op_idx;
839 unsigned i, pvec_pages, max_pages, locked_pages = 0;
840 struct page **pages = NULL, **data_pages;
841 mempool_t *pool = NULL;
842 struct page *page;
843 pgoff_t strip_unit_end = 0;
844 u64 offset = 0, len = 0;
845
846 max_pages = wsize >> PAGE_SHIFT;
847
848get_more_pages:
849 pvec_pages = min_t(unsigned, PAGEVEC_SIZE,
850 max_pages - locked_pages);
851 if (end - index < (u64)(pvec_pages - 1))
852 pvec_pages = (unsigned)(end - index) + 1;
853
854 pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
855 PAGECACHE_TAG_DIRTY,
856 pvec_pages);
857 dout("pagevec_lookup_tag got %d\n", pvec_pages);
858 if (!pvec_pages && !locked_pages)
859 break;
860 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
861 page = pvec.pages[i];
862 dout("? %p idx %lu\n", page, page->index);
863 if (locked_pages == 0)
864 lock_page(page);
865 else if (!trylock_page(page))
866 break;
867
868
869 if (unlikely(!PageDirty(page)) ||
870 unlikely(page->mapping != mapping)) {
871 dout("!dirty or !mapping %p\n", page);
872 unlock_page(page);
873 continue;
874 }
875
876 pgsnapc = page_snap_context(page);
877 if (pgsnapc != snapc) {
878 dout("page snapc %p %lld != oldest %p %lld\n",
879 pgsnapc, pgsnapc->seq, snapc, snapc->seq);
880 if (!should_loop &&
881 !ceph_wbc.head_snapc &&
882 wbc->sync_mode != WB_SYNC_NONE)
883 should_loop = true;
884 unlock_page(page);
885 continue;
886 }
887 if (page_offset(page) >= ceph_wbc.i_size) {
888 dout("%p page eof %llu\n",
889 page, ceph_wbc.i_size);
890 if (ceph_wbc.size_stable ||
891 page_offset(page) >= i_size_read(inode))
892 mapping->a_ops->invalidatepage(page, 0);
893 unlock_page(page);
894 continue;
895 }
896 if (strip_unit_end && (page->index > strip_unit_end)) {
897 dout("end of strip unit %p\n", page);
898 unlock_page(page);
899 break;
900 }
901 if (PageWriteback(page)) {
902 if (wbc->sync_mode == WB_SYNC_NONE) {
903 dout("%p under writeback\n", page);
904 unlock_page(page);
905 continue;
906 }
907 dout("waiting on writeback %p\n", page);
908 wait_on_page_writeback(page);
909 }
910
911 if (!clear_page_dirty_for_io(page)) {
912 dout("%p !clear_page_dirty_for_io\n", page);
913 unlock_page(page);
914 continue;
915 }
916
917
918
919
920
921
922
923 if (locked_pages == 0) {
924 u64 objnum;
925 u64 objoff;
926 u32 xlen;
927
928
929 offset = (u64)page_offset(page);
930 ceph_calc_file_object_mapping(&ci->i_layout,
931 offset, wsize,
932 &objnum, &objoff,
933 &xlen);
934 len = xlen;
935
936 num_ops = 1;
937 strip_unit_end = page->index +
938 ((len - 1) >> PAGE_CACHE_SHIFT);
939
940 BUG_ON(pages);
941 max_pages = calc_pages_for(0, (u64)len);
942 pages = kmalloc(max_pages * sizeof (*pages),
943 GFP_NOFS);
944 if (!pages) {
945 pool = fsc->wb_pagevec_pool;
946 pages = mempool_alloc(pool, GFP_NOFS);
947 BUG_ON(!pages);
948 }
949
950 len = 0;
951 } else if (page->index !=
952 (offset + len) >> PAGE_CACHE_SHIFT) {
953 if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
954 CEPH_OSD_MAX_OPS)) {
955 redirty_page_for_writepage(wbc, page);
956 unlock_page(page);
957 break;
958 }
959
960 num_ops++;
961 offset = (u64)page_offset(page);
962 len = 0;
963 }
964
965
966 dout("%p will write page %p idx %lu\n",
967 inode, page, page->index);
968
969 if (atomic_long_inc_return(&fsc->writeback_count) >
970 CONGESTION_ON_THRESH(
971 fsc->mount_options->congestion_kb)) {
972 set_bdi_congested(&fsc->backing_dev_info,
973 BLK_RW_ASYNC);
974 }
975
976
977 pages[locked_pages++] = page;
978 pvec.pages[i] = NULL;
979
980 len += PAGE_CACHE_SIZE;
981 }
982
983
984 if (!locked_pages)
985 goto release_pvec_pages;
986 if (i) {
987 unsigned j, n = 0;
988
989 for (j = 0; j < pvec_pages; j++) {
990 if (!pvec.pages[j])
991 continue;
992 if (n < j)
993 pvec.pages[n] = pvec.pages[j];
994 n++;
995 }
996 pvec.nr = n;
997
998 if (pvec_pages && i == pvec_pages &&
999 locked_pages < max_pages) {
1000 dout("reached end pvec, trying for more\n");
1001 pagevec_release(&pvec);
1002 goto get_more_pages;
1003 }
1004 }
1005
1006new_request:
1007 offset = page_offset(pages[0]);
1008 len = wsize;
1009
1010 req = ceph_osdc_new_request(&fsc->client->osdc,
1011 &ci->i_layout, vino,
1012 offset, &len, 0, num_ops,
1013 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1014 snapc, ceph_wbc.truncate_seq,
1015 ceph_wbc.truncate_size, false);
1016 if (IS_ERR(req)) {
1017 req = ceph_osdc_new_request(&fsc->client->osdc,
1018 &ci->i_layout, vino,
1019 offset, &len, 0,
1020 min(num_ops,
1021 CEPH_OSD_SLAB_OPS),
1022 CEPH_OSD_OP_WRITE,
1023 CEPH_OSD_FLAG_WRITE,
1024 snapc, ceph_wbc.truncate_seq,
1025 ceph_wbc.truncate_size, true);
1026 BUG_ON(IS_ERR(req));
1027 }
1028 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
1029 PAGE_CACHE_SIZE - offset);
1030
1031 req->r_callback = writepages_finish;
1032 req->r_inode = inode;
1033
1034
1035 len = 0;
1036 data_pages = pages;
1037 op_idx = 0;
1038 for (i = 0; i < locked_pages; i++) {
1039 u64 cur_offset = page_offset(pages[i]);
1040 if (offset + len != cur_offset) {
1041 if (op_idx + 1 == req->r_num_ops)
1042 break;
1043 osd_req_op_extent_dup_last(req, op_idx,
1044 cur_offset - offset);
1045 dout("writepages got pages at %llu~%llu\n",
1046 offset, len);
1047 osd_req_op_extent_osd_data_pages(req, op_idx,
1048 data_pages, len, 0,
1049 !!pool, false);
1050 osd_req_op_extent_update(req, op_idx, len);
1051
1052 len = 0;
1053 offset = cur_offset;
1054 data_pages = pages + i;
1055 op_idx++;
1056 }
1057
1058 set_page_writeback(pages[i]);
1059 len += PAGE_CACHE_SIZE;
1060 }
1061
1062 if (ceph_wbc.size_stable) {
1063 len = min(len, ceph_wbc.i_size - offset);
1064 } else if (i == locked_pages) {
1065
1066
1067
1068 u64 min_len = len + 1 - PAGE_CACHE_SIZE;
1069 len = get_writepages_data_length(inode, pages[i - 1],
1070 offset);
1071 len = max(len, min_len);
1072 }
1073 dout("writepages got pages at %llu~%llu\n", offset, len);
1074
1075 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1076 0, !!pool, false);
1077 osd_req_op_extent_update(req, op_idx, len);
1078
1079 BUG_ON(op_idx + 1 != req->r_num_ops);
1080
1081 pool = NULL;
1082 if (i < locked_pages) {
1083 BUG_ON(num_ops <= req->r_num_ops);
1084 num_ops -= req->r_num_ops;
1085 locked_pages -= i;
1086
1087
1088 data_pages = pages;
1089 pages = kmalloc(locked_pages * sizeof (*pages),
1090 GFP_NOFS);
1091 if (!pages) {
1092 pool = fsc->wb_pagevec_pool;
1093 pages = mempool_alloc(pool, GFP_NOFS);
1094 BUG_ON(!pages);
1095 }
1096 memcpy(pages, data_pages + i,
1097 locked_pages * sizeof(*pages));
1098 memset(data_pages + i, 0,
1099 locked_pages * sizeof(*pages));
1100 } else {
1101 BUG_ON(num_ops != req->r_num_ops);
1102 index = pages[i - 1]->index + 1;
1103
1104 pages = NULL;
1105 }
1106
1107 req->r_mtime = inode->i_mtime;
1108 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1109 BUG_ON(rc);
1110 req = NULL;
1111
1112 wbc->nr_to_write -= i;
1113 if (pages)
1114 goto new_request;
1115
1116
1117
1118
1119
1120
1121
1122 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
1123 done = true;
1124
1125release_pvec_pages:
1126 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
1127 pvec.nr ? pvec.pages[0] : NULL);
1128 pagevec_release(&pvec);
1129 }
1130
1131 if (should_loop && !done) {
1132
1133 dout("writepages looping back to beginning of file\n");
1134 end = start_index - 1;
1135
1136
1137
1138 if (wbc->sync_mode != WB_SYNC_NONE &&
1139 start_index == 0 &&
1140 !ceph_wbc.head_snapc) {
1141 struct page *page;
1142 unsigned i, nr;
1143 index = 0;
1144 while ((index <= end) &&
1145 (nr = pagevec_lookup_tag(&pvec, mapping, &index,
1146 PAGECACHE_TAG_WRITEBACK,
1147 PAGEVEC_SIZE))) {
1148 for (i = 0; i < nr; i++) {
1149 page = pvec.pages[i];
1150 if (page_snap_context(page) != snapc)
1151 continue;
1152 wait_on_page_writeback(page);
1153 }
1154 pagevec_release(&pvec);
1155 cond_resched();
1156 }
1157 }
1158
1159 start_index = 0;
1160 index = 0;
1161 goto retry;
1162 }
1163
1164 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
1165 mapping->writeback_index = index;
1166
1167out:
1168 ceph_osdc_put_request(req);
1169 ceph_put_snap_context(last_snapc);
1170 dout("writepages dend - startone, rc = %d\n", rc);
1171 return rc;
1172}
1173
1174
1175
1176
1177
1178
1179static int context_is_writeable_or_written(struct inode *inode,
1180 struct ceph_snap_context *snapc)
1181{
1182 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
1183 int ret = !oldest || snapc->seq <= oldest->seq;
1184
1185 ceph_put_snap_context(oldest);
1186 return ret;
1187}
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197static int ceph_update_writeable_page(struct file *file,
1198 loff_t pos, unsigned len,
1199 struct page *page)
1200{
1201 struct inode *inode = file_inode(file);
1202 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1203 struct ceph_inode_info *ci = ceph_inode(inode);
1204 loff_t page_off = pos & PAGE_CACHE_MASK;
1205 int pos_in_page = pos & ~PAGE_CACHE_MASK;
1206 int end_in_page = pos_in_page + len;
1207 loff_t i_size;
1208 int r;
1209 struct ceph_snap_context *snapc, *oldest;
1210
1211 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1212 dout(" page %p forced umount\n", page);
1213 unlock_page(page);
1214 return -EIO;
1215 }
1216
1217retry_locked:
1218
1219 wait_on_page_writeback(page);
1220
1221 snapc = page_snap_context(page);
1222 if (snapc && snapc != ci->i_head_snapc) {
1223
1224
1225
1226
1227 oldest = get_oldest_context(inode, NULL, NULL);
1228 if (snapc->seq > oldest->seq) {
1229 ceph_put_snap_context(oldest);
1230 dout(" page %p snapc %p not current or oldest\n",
1231 page, snapc);
1232
1233
1234
1235
1236 snapc = ceph_get_snap_context(snapc);
1237 unlock_page(page);
1238 ceph_queue_writeback(inode);
1239 r = wait_event_killable(ci->i_cap_wq,
1240 context_is_writeable_or_written(inode, snapc));
1241 ceph_put_snap_context(snapc);
1242 if (r == -ERESTARTSYS)
1243 return r;
1244 return -EAGAIN;
1245 }
1246 ceph_put_snap_context(oldest);
1247
1248
1249 dout(" page %p snapc %p not current, but oldest\n",
1250 page, snapc);
1251 if (!clear_page_dirty_for_io(page))
1252 goto retry_locked;
1253 r = writepage_nounlock(page, NULL);
1254 if (r < 0)
1255 goto fail_nosnap;
1256 goto retry_locked;
1257 }
1258
1259 if (PageUptodate(page)) {
1260 dout(" page %p already uptodate\n", page);
1261 return 0;
1262 }
1263
1264
1265 if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
1266 return 0;
1267
1268
1269 i_size = i_size_read(inode);
1270
1271 if (page_off >= i_size ||
1272 (pos_in_page == 0 && (pos+len) >= i_size &&
1273 end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
1274 dout(" zeroing %p 0 - %d and %d - %d\n",
1275 page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
1276 zero_user_segments(page,
1277 0, pos_in_page,
1278 end_in_page, PAGE_CACHE_SIZE);
1279 return 0;
1280 }
1281
1282
1283 r = readpage_nounlock(file, page);
1284 if (r < 0)
1285 goto fail_nosnap;
1286 goto retry_locked;
1287fail_nosnap:
1288 unlock_page(page);
1289 return r;
1290}
1291
1292
1293
1294
1295
1296static int ceph_write_begin(struct file *file, struct address_space *mapping,
1297 loff_t pos, unsigned len, unsigned flags,
1298 struct page **pagep, void **fsdata)
1299{
1300 struct inode *inode = file_inode(file);
1301 struct page *page;
1302 pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1303 int r;
1304
1305 do {
1306
1307 page = grab_cache_page_write_begin(mapping, index, 0);
1308 if (!page)
1309 return -ENOMEM;
1310
1311 dout("write_begin file %p inode %p page %p %d~%d\n", file,
1312 inode, page, (int)pos, (int)len);
1313
1314 r = ceph_update_writeable_page(file, pos, len, page);
1315 if (r < 0)
1316 page_cache_release(page);
1317 else
1318 *pagep = page;
1319 } while (r == -EAGAIN);
1320
1321 return r;
1322}
1323
1324
1325
1326
1327
1328static int ceph_write_end(struct file *file, struct address_space *mapping,
1329 loff_t pos, unsigned len, unsigned copied,
1330 struct page *page, void *fsdata)
1331{
1332 struct inode *inode = file_inode(file);
1333 bool check_cap = false;
1334
1335 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1336 inode, page, (int)pos, (int)copied, (int)len);
1337
1338
1339 if (!PageUptodate(page)) {
1340 if (copied < len) {
1341 copied = 0;
1342 goto out;
1343 }
1344 SetPageUptodate(page);
1345 }
1346
1347
1348 if (pos+copied > i_size_read(inode))
1349 check_cap = ceph_inode_set_size(inode, pos+copied);
1350
1351 set_page_dirty(page);
1352
1353out:
1354 unlock_page(page);
1355 page_cache_release(page);
1356
1357 if (check_cap)
1358 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1359
1360 return copied;
1361}
1362
1363
1364
1365
1366
1367
1368static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1369 const struct iovec *iov,
1370 loff_t pos, unsigned long nr_segs)
1371{
1372 WARN_ON(1);
1373 return -EINVAL;
1374}
1375
1376const struct address_space_operations ceph_aops = {
1377 .readpage = ceph_readpage,
1378 .readpages = ceph_readpages,
1379 .writepage = ceph_writepage,
1380 .writepages = ceph_writepages_start,
1381 .write_begin = ceph_write_begin,
1382 .write_end = ceph_write_end,
1383 .set_page_dirty = ceph_set_page_dirty,
1384 .invalidatepage = ceph_invalidatepage,
1385 .releasepage = ceph_releasepage,
1386 .direct_IO = ceph_direct_io,
1387};
1388
1389static void ceph_block_sigs(sigset_t *oldset)
1390{
1391 sigset_t mask;
1392 siginitsetinv(&mask, sigmask(SIGKILL));
1393 sigprocmask(SIG_BLOCK, &mask, oldset);
1394}
1395
1396static void ceph_restore_sigs(sigset_t *oldset)
1397{
1398 sigprocmask(SIG_SETMASK, oldset, NULL);
1399}
1400
1401
1402
1403
1404static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1405{
1406 struct inode *inode = file_inode(vma->vm_file);
1407 struct ceph_inode_info *ci = ceph_inode(inode);
1408 struct ceph_file_info *fi = vma->vm_file->private_data;
1409 struct page *pinned_page = NULL;
1410 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1411 int want, got, ret;
1412 sigset_t oldset;
1413
1414 ceph_block_sigs(&oldset);
1415
1416 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1417 inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
1418 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1419 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1420 else
1421 want = CEPH_CAP_FILE_CACHE;
1422
1423 got = 0;
1424 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1425 if (ret < 0)
1426 goto out_restore;
1427
1428 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1429 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1430
1431 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1432 ci->i_inline_version == CEPH_INLINE_NONE) {
1433 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1434 ceph_add_rw_context(fi, &rw_ctx);
1435 ret = filemap_fault(vma, vmf);
1436 ceph_del_rw_context(fi, &rw_ctx);
1437 } else
1438 ret = -EAGAIN;
1439
1440 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1441 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1442 if (pinned_page)
1443 page_cache_release(pinned_page);
1444 ceph_put_cap_refs(ci, got);
1445
1446 if (ret != -EAGAIN)
1447 goto out_restore;
1448
1449
1450 if (off >= PAGE_CACHE_SIZE) {
1451
1452 ret = VM_FAULT_SIGBUS;
1453 } else {
1454 int ret1;
1455 struct address_space *mapping = inode->i_mapping;
1456 struct page *page = find_or_create_page(mapping, 0,
1457 mapping_gfp_mask(mapping) &
1458 ~__GFP_FS);
1459 if (!page) {
1460 ret = VM_FAULT_OOM;
1461 goto out_inline;
1462 }
1463 ret1 = __ceph_do_getattr(inode, page,
1464 CEPH_STAT_CAP_INLINE_DATA, true);
1465 if (ret1 < 0 || off >= i_size_read(inode)) {
1466 unlock_page(page);
1467 page_cache_release(page);
1468 if (ret1 < 0)
1469 ret = ret1;
1470 else
1471 ret = VM_FAULT_SIGBUS;
1472 goto out_inline;
1473 }
1474 if (ret1 < PAGE_CACHE_SIZE)
1475 zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
1476 else
1477 flush_dcache_page(page);
1478 SetPageUptodate(page);
1479 vmf->page = page;
1480 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1481out_inline:
1482 dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1483 inode, off, (size_t)PAGE_SIZE, ret);
1484 }
1485out_restore:
1486 ceph_restore_sigs(&oldset);
1487 if (ret < 0)
1488 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1489
1490 return ret;
1491}
1492
1493
1494
1495
1496static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1497{
1498 struct inode *inode = file_inode(vma->vm_file);
1499 struct ceph_inode_info *ci = ceph_inode(inode);
1500 struct ceph_file_info *fi = vma->vm_file->private_data;
1501 struct ceph_cap_flush *prealloc_cf;
1502 struct page *page = vmf->page;
1503 loff_t off = page_offset(page);
1504 loff_t size = i_size_read(inode);
1505 size_t len;
1506 int want, got, ret;
1507 sigset_t oldset;
1508
1509 prealloc_cf = ceph_alloc_cap_flush();
1510 if (!prealloc_cf)
1511 return VM_FAULT_OOM;
1512
1513 ceph_block_sigs(&oldset);
1514
1515 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1516 struct page *locked_page = NULL;
1517 if (off == 0) {
1518 lock_page(page);
1519 locked_page = page;
1520 }
1521 ret = ceph_uninline_data(vma->vm_file, locked_page);
1522 if (locked_page)
1523 unlock_page(locked_page);
1524 if (ret < 0)
1525 goto out_free;
1526 }
1527
1528 if (off + PAGE_CACHE_SIZE <= size)
1529 len = PAGE_CACHE_SIZE;
1530 else
1531 len = size & ~PAGE_CACHE_MASK;
1532
1533 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1534 inode, ceph_vinop(inode), off, len, size);
1535 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1536 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1537 else
1538 want = CEPH_CAP_FILE_BUFFER;
1539
1540 got = 0;
1541 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1542 &got, NULL);
1543 if (ret < 0)
1544 goto out_free;
1545
1546 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1547 inode, off, len, ceph_cap_string(got));
1548
1549
1550 file_update_time(vma->vm_file);
1551
1552 do {
1553 lock_page(page);
1554
1555 if ((off > size) || (page->mapping != inode->i_mapping)) {
1556 unlock_page(page);
1557 ret = VM_FAULT_NOPAGE;
1558 break;
1559 }
1560
1561 ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1562 if (ret >= 0) {
1563
1564 set_page_dirty(page);
1565 ret = VM_FAULT_LOCKED;
1566 }
1567 } while (ret == -EAGAIN);
1568
1569 if (ret == VM_FAULT_LOCKED ||
1570 ci->i_inline_version != CEPH_INLINE_NONE) {
1571 int dirty;
1572 spin_lock(&ci->i_ceph_lock);
1573 ci->i_inline_version = CEPH_INLINE_NONE;
1574 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1575 &prealloc_cf);
1576 spin_unlock(&ci->i_ceph_lock);
1577 if (dirty)
1578 __mark_inode_dirty(inode, dirty);
1579 }
1580
1581 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
1582 inode, off, len, ceph_cap_string(got), ret);
1583 ceph_put_cap_refs(ci, got);
1584out_free:
1585 ceph_restore_sigs(&oldset);
1586 ceph_free_cap_flush(prealloc_cf);
1587 if (ret < 0)
1588 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1589 return ret;
1590}
1591
1592void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1593 char *data, size_t len)
1594{
1595 struct address_space *mapping = inode->i_mapping;
1596 struct page *page;
1597
1598 if (locked_page) {
1599 page = locked_page;
1600 } else {
1601 if (i_size_read(inode) == 0)
1602 return;
1603 page = find_or_create_page(mapping, 0,
1604 mapping_gfp_mask(mapping) & ~__GFP_FS);
1605 if (!page)
1606 return;
1607 if (PageUptodate(page)) {
1608 unlock_page(page);
1609 page_cache_release(page);
1610 return;
1611 }
1612 }
1613
1614 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
1615 inode, ceph_vinop(inode), len, locked_page);
1616
1617 if (len > 0) {
1618 void *kaddr = kmap_atomic(page);
1619 memcpy(kaddr, data, len);
1620 kunmap_atomic(kaddr);
1621 }
1622
1623 if (page != locked_page) {
1624 if (len < PAGE_CACHE_SIZE)
1625 zero_user_segment(page, len, PAGE_CACHE_SIZE);
1626 else
1627 flush_dcache_page(page);
1628
1629 SetPageUptodate(page);
1630 unlock_page(page);
1631 page_cache_release(page);
1632 }
1633}
1634
1635int ceph_uninline_data(struct file *filp, struct page *locked_page)
1636{
1637 struct inode *inode = file_inode(filp);
1638 struct ceph_inode_info *ci = ceph_inode(inode);
1639 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1640 struct ceph_osd_request *req;
1641 struct page *page = NULL;
1642 u64 len, inline_version;
1643 int err = 0;
1644 bool from_pagecache = false;
1645
1646 spin_lock(&ci->i_ceph_lock);
1647 inline_version = ci->i_inline_version;
1648 spin_unlock(&ci->i_ceph_lock);
1649
1650 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1651 inode, ceph_vinop(inode), inline_version);
1652
1653 if (inline_version == 1 ||
1654 inline_version == CEPH_INLINE_NONE)
1655 goto out;
1656
1657 if (locked_page) {
1658 page = locked_page;
1659 WARN_ON(!PageUptodate(page));
1660 } else if (ceph_caps_issued(ci) &
1661 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1662 page = find_get_page(inode->i_mapping, 0);
1663 if (page) {
1664 if (PageUptodate(page)) {
1665 from_pagecache = true;
1666 lock_page(page);
1667 } else {
1668 page_cache_release(page);
1669 page = NULL;
1670 }
1671 }
1672 }
1673
1674 if (page) {
1675 len = i_size_read(inode);
1676 if (len > PAGE_CACHE_SIZE)
1677 len = PAGE_CACHE_SIZE;
1678 } else {
1679 page = __page_cache_alloc(GFP_NOFS);
1680 if (!page) {
1681 err = -ENOMEM;
1682 goto out;
1683 }
1684 err = __ceph_do_getattr(inode, page,
1685 CEPH_STAT_CAP_INLINE_DATA, true);
1686 if (err < 0) {
1687
1688 if (err == -ENODATA)
1689 err = 0;
1690 goto out;
1691 }
1692 len = err;
1693 }
1694
1695 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1696 ceph_vino(inode), 0, &len, 0, 1,
1697 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
1698 NULL, 0, 0, false);
1699 if (IS_ERR(req)) {
1700 err = PTR_ERR(req);
1701 goto out;
1702 }
1703
1704 req->r_mtime = inode->i_mtime;
1705 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1706 if (!err)
1707 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1708 ceph_osdc_put_request(req);
1709 if (err < 0)
1710 goto out;
1711
1712 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1713 ceph_vino(inode), 0, &len, 1, 3,
1714 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1715 NULL, ci->i_truncate_seq,
1716 ci->i_truncate_size, false);
1717 if (IS_ERR(req)) {
1718 err = PTR_ERR(req);
1719 goto out;
1720 }
1721
1722 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1723
1724 {
1725 __le64 xattr_buf = cpu_to_le64(inline_version);
1726 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1727 "inline_version", &xattr_buf,
1728 sizeof(xattr_buf),
1729 CEPH_OSD_CMPXATTR_OP_GT,
1730 CEPH_OSD_CMPXATTR_MODE_U64);
1731 if (err)
1732 goto out_put;
1733 }
1734
1735 {
1736 char xattr_buf[32];
1737 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
1738 "%llu", inline_version);
1739 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1740 "inline_version",
1741 xattr_buf, xattr_len, 0, 0);
1742 if (err)
1743 goto out_put;
1744 }
1745
1746 req->r_mtime = inode->i_mtime;
1747 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1748 if (!err)
1749 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1750out_put:
1751 ceph_osdc_put_request(req);
1752 if (err == -ECANCELED)
1753 err = 0;
1754out:
1755 if (page && page != locked_page) {
1756 if (from_pagecache) {
1757 unlock_page(page);
1758 page_cache_release(page);
1759 } else
1760 __free_pages(page, 0);
1761 }
1762
1763 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1764 inode, ceph_vinop(inode), inline_version, err);
1765 return err;
1766}
1767
1768static struct vm_operations_struct ceph_vmops = {
1769 .fault = ceph_filemap_fault,
1770 .page_mkwrite = ceph_page_mkwrite,
1771 .remap_pages = generic_file_remap_pages,
1772};
1773
1774int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1775{
1776 struct address_space *mapping = file->f_mapping;
1777
1778 if (!mapping->a_ops->readpage)
1779 return -ENOEXEC;
1780 file_accessed(file);
1781 vma->vm_ops = &ceph_vmops;
1782 return 0;
1783}
1784
1785enum {
1786 POOL_READ = 1,
1787 POOL_WRITE = 2,
1788};
1789
1790static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1791 s64 pool, struct ceph_string *pool_ns)
1792{
1793 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1794 struct ceph_mds_client *mdsc = fsc->mdsc;
1795 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
1796 struct rb_node **p, *parent;
1797 struct ceph_pool_perm *perm;
1798 struct page **pages;
1799 size_t pool_ns_len;
1800 int err = 0, err2 = 0, have = 0;
1801
1802 down_read(&mdsc->pool_perm_rwsem);
1803 p = &mdsc->pool_perm_tree.rb_node;
1804 while (*p) {
1805 perm = rb_entry(*p, struct ceph_pool_perm, node);
1806 if (pool < perm->pool)
1807 p = &(*p)->rb_left;
1808 else if (pool > perm->pool)
1809 p = &(*p)->rb_right;
1810 else {
1811 int ret = ceph_compare_string(pool_ns,
1812 perm->pool_ns,
1813 perm->pool_ns_len);
1814 if (ret < 0)
1815 p = &(*p)->rb_left;
1816 else if (ret > 0)
1817 p = &(*p)->rb_right;
1818 else {
1819 have = perm->perm;
1820 break;
1821 }
1822 }
1823 }
1824 up_read(&mdsc->pool_perm_rwsem);
1825 if (*p)
1826 goto out;
1827
1828 if (pool_ns)
1829 dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
1830 pool, (int)pool_ns->len, pool_ns->str);
1831 else
1832 dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
1833
1834 down_write(&mdsc->pool_perm_rwsem);
1835 p = &mdsc->pool_perm_tree.rb_node;
1836 parent = NULL;
1837 while (*p) {
1838 parent = *p;
1839 perm = rb_entry(parent, struct ceph_pool_perm, node);
1840 if (pool < perm->pool)
1841 p = &(*p)->rb_left;
1842 else if (pool > perm->pool)
1843 p = &(*p)->rb_right;
1844 else {
1845 int ret = ceph_compare_string(pool_ns,
1846 perm->pool_ns,
1847 perm->pool_ns_len);
1848 if (ret < 0)
1849 p = &(*p)->rb_left;
1850 else if (ret > 0)
1851 p = &(*p)->rb_right;
1852 else {
1853 have = perm->perm;
1854 break;
1855 }
1856 }
1857 }
1858 if (*p) {
1859 up_write(&mdsc->pool_perm_rwsem);
1860 goto out;
1861 }
1862
1863 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1864 1, false, GFP_NOFS);
1865 if (!rd_req) {
1866 err = -ENOMEM;
1867 goto out_unlock;
1868 }
1869
1870 rd_req->r_flags = CEPH_OSD_FLAG_READ;
1871 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1872 rd_req->r_base_oloc.pool = pool;
1873 if (pool_ns)
1874 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
1875 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
1876
1877 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1878 if (err)
1879 goto out_unlock;
1880
1881 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1882 1, false, GFP_NOFS);
1883 if (!wr_req) {
1884 err = -ENOMEM;
1885 goto out_unlock;
1886 }
1887
1888 wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
1889 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1890 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1891 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
1892
1893 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1894 if (err)
1895 goto out_unlock;
1896
1897
1898 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
1899 if (IS_ERR(pages)) {
1900 err = PTR_ERR(pages);
1901 goto out_unlock;
1902 }
1903
1904 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1905 0, false, true);
1906 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1907
1908 wr_req->r_mtime = ci->vfs_inode.i_mtime;
1909 wr_req->r_abort_on_full = true;
1910 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1911
1912 if (!err)
1913 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
1914 if (!err2)
1915 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
1916
1917 if (err >= 0 || err == -ENOENT)
1918 have |= POOL_READ;
1919 else if (err != -EPERM)
1920 goto out_unlock;
1921
1922 if (err2 == 0 || err2 == -EEXIST)
1923 have |= POOL_WRITE;
1924 else if (err2 != -EPERM) {
1925 err = err2;
1926 goto out_unlock;
1927 }
1928
1929 pool_ns_len = pool_ns ? pool_ns->len : 0;
1930 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
1931 if (!perm) {
1932 err = -ENOMEM;
1933 goto out_unlock;
1934 }
1935
1936 perm->pool = pool;
1937 perm->perm = have;
1938 perm->pool_ns_len = pool_ns_len;
1939 if (pool_ns_len > 0)
1940 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
1941 perm->pool_ns[pool_ns_len] = 0;
1942
1943 rb_link_node(&perm->node, parent, p);
1944 rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
1945 err = 0;
1946out_unlock:
1947 up_write(&mdsc->pool_perm_rwsem);
1948
1949 ceph_osdc_put_request(rd_req);
1950 ceph_osdc_put_request(wr_req);
1951out:
1952 if (!err)
1953 err = have;
1954 if (pool_ns)
1955 dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
1956 pool, (int)pool_ns->len, pool_ns->str, err);
1957 else
1958 dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
1959 return err;
1960}
1961
1962int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
1963{
1964 s64 pool;
1965 struct ceph_string *pool_ns;
1966 int ret, flags;
1967
1968 if (ci->i_vino.snap != CEPH_NOSNAP) {
1969
1970
1971
1972
1973
1974 return 0;
1975 }
1976
1977 if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
1978 NOPOOLPERM))
1979 return 0;
1980
1981 spin_lock(&ci->i_ceph_lock);
1982 flags = ci->i_ceph_flags;
1983 pool = ci->i_layout.pool_id;
1984 spin_unlock(&ci->i_ceph_lock);
1985check:
1986 if (flags & CEPH_I_POOL_PERM) {
1987 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
1988 dout("ceph_pool_perm_check pool %lld no read perm\n",
1989 pool);
1990 return -EPERM;
1991 }
1992 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
1993 dout("ceph_pool_perm_check pool %lld no write perm\n",
1994 pool);
1995 return -EPERM;
1996 }
1997 return 0;
1998 }
1999
2000 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
2001 ret = __ceph_pool_perm_get(ci, pool, pool_ns);
2002 ceph_put_string(pool_ns);
2003 if (ret < 0)
2004 return ret;
2005
2006 flags = CEPH_I_POOL_PERM;
2007 if (ret & POOL_READ)
2008 flags |= CEPH_I_POOL_RD;
2009 if (ret & POOL_WRITE)
2010 flags |= CEPH_I_POOL_WR;
2011
2012 spin_lock(&ci->i_ceph_lock);
2013 if (pool == ci->i_layout.pool_id &&
2014 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
2015 ci->i_ceph_flags |= flags;
2016 } else {
2017 pool = ci->i_layout.pool_id;
2018 flags = ci->i_ceph_flags;
2019 }
2020 spin_unlock(&ci->i_ceph_lock);
2021 goto check;
2022}
2023
2024void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
2025{
2026 struct ceph_pool_perm *perm;
2027 struct rb_node *n;
2028
2029 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
2030 n = rb_first(&mdsc->pool_perm_tree);
2031 perm = rb_entry(n, struct ceph_pool_perm, node);
2032 rb_erase(n, &mdsc->pool_perm_tree);
2033 kfree(perm);
2034 }
2035}
2036