1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/module.h>
4#include <linux/err.h>
5#include <linux/highmem.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/slab.h>
9#include <linux/uaccess.h>
10#ifdef CONFIG_BLOCK
11#include <linux/bio.h>
12#endif
13
14#include <linux/ceph/libceph.h>
15#include <linux/ceph/osd_client.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/auth.h>
19#include <linux/ceph/pagelist.h>
20
21#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512
23
24static const struct ceph_connection_operations osd_con_ops;
25static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
27
28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
29
30static int op_needs_trail(int op)
31{
32 switch (op) {
33 case CEPH_OSD_OP_GETXATTR:
34 case CEPH_OSD_OP_SETXATTR:
35 case CEPH_OSD_OP_CMPXATTR:
36 case CEPH_OSD_OP_CALL:
37 return 1;
38 default:
39 return 0;
40 }
41}
42
43static int op_has_extent(int op)
44{
45 return (op == CEPH_OSD_OP_READ ||
46 op == CEPH_OSD_OP_WRITE);
47}
48
49void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
50 struct ceph_file_layout *layout,
51 u64 snapid,
52 u64 off, u64 *plen, u64 *bno,
53 struct ceph_osd_request *req,
54 struct ceph_osd_req_op *op)
55{
56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
57 u64 orig_len = *plen;
58 u64 objoff, objlen;
59
60 reqhead->snapid = cpu_to_le64(snapid);
61
62
63 ceph_calc_file_object_mapping(layout, off, plen, bno,
64 &objoff, &objlen);
65 if (*plen < orig_len)
66 dout(" skipping last %llu, final file extent %llu~%llu\n",
67 orig_len - *plen, off, *plen);
68
69 if (op_has_extent(op->op)) {
70 op->extent.offset = objoff;
71 op->extent.length = objlen;
72 }
73 req->r_num_pages = calc_pages_for(off, *plen);
74 req->r_page_alignment = off & ~PAGE_MASK;
75 if (op->op == CEPH_OSD_OP_WRITE)
76 op->payload_len = *plen;
77
78 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
79 *bno, objoff, objlen, req->r_num_pages);
80
81}
82EXPORT_SYMBOL(ceph_calc_raw_layout);
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109static void calc_layout(struct ceph_osd_client *osdc,
110 struct ceph_vino vino,
111 struct ceph_file_layout *layout,
112 u64 off, u64 *plen,
113 struct ceph_osd_request *req,
114 struct ceph_osd_req_op *op)
115{
116 u64 bno;
117
118 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
119 plen, &bno, req, op);
120
121 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
122 req->r_oid_len = strlen(req->r_oid);
123}
124
125
126
127
128void ceph_osdc_release_request(struct kref *kref)
129{
130 struct ceph_osd_request *req = container_of(kref,
131 struct ceph_osd_request,
132 r_kref);
133
134 if (req->r_request)
135 ceph_msg_put(req->r_request);
136 if (req->r_reply)
137 ceph_msg_put(req->r_reply);
138 if (req->r_con_filling_msg) {
139 dout("release_request revoking pages %p from con %p\n",
140 req->r_pages, req->r_con_filling_msg);
141 ceph_con_revoke_message(req->r_con_filling_msg,
142 req->r_reply);
143 ceph_con_put(req->r_con_filling_msg);
144 }
145 if (req->r_own_pages)
146 ceph_release_page_vector(req->r_pages,
147 req->r_num_pages);
148#ifdef CONFIG_BLOCK
149 if (req->r_bio)
150 bio_put(req->r_bio);
151#endif
152 ceph_put_snap_context(req->r_snapc);
153 if (req->r_trail) {
154 ceph_pagelist_release(req->r_trail);
155 kfree(req->r_trail);
156 }
157 if (req->r_mempool)
158 mempool_free(req, req->r_osdc->req_mempool);
159 else
160 kfree(req);
161}
162EXPORT_SYMBOL(ceph_osdc_release_request);
163
164static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
165{
166 int i = 0;
167
168 if (needs_trail)
169 *needs_trail = 0;
170 while (ops[i].op) {
171 if (needs_trail && op_needs_trail(ops[i].op))
172 *needs_trail = 1;
173 i++;
174 }
175
176 return i;
177}
178
179struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
180 int flags,
181 struct ceph_snap_context *snapc,
182 struct ceph_osd_req_op *ops,
183 bool use_mempool,
184 gfp_t gfp_flags,
185 struct page **pages,
186 struct bio *bio)
187{
188 struct ceph_osd_request *req;
189 struct ceph_msg *msg;
190 int needs_trail;
191 int num_op = get_num_ops(ops, &needs_trail);
192 size_t msg_size = sizeof(struct ceph_osd_request_head);
193
194 msg_size += num_op*sizeof(struct ceph_osd_op);
195
196 if (use_mempool) {
197 req = mempool_alloc(osdc->req_mempool, gfp_flags);
198 memset(req, 0, sizeof(*req));
199 } else {
200 req = kzalloc(sizeof(*req), gfp_flags);
201 }
202 if (req == NULL)
203 return NULL;
204
205 req->r_osdc = osdc;
206 req->r_mempool = use_mempool;
207
208 kref_init(&req->r_kref);
209 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion);
211 INIT_LIST_HEAD(&req->r_unsafe_item);
212 req->r_flags = flags;
213
214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
215
216
217 if (use_mempool)
218 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
219 else
220 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
221 OSD_OPREPLY_FRONT_LEN, gfp_flags);
222 if (!msg) {
223 ceph_osdc_put_request(req);
224 return NULL;
225 }
226 req->r_reply = msg;
227
228
229 if (needs_trail) {
230 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
231 if (!req->r_trail) {
232 ceph_osdc_put_request(req);
233 return NULL;
234 }
235 ceph_pagelist_init(req->r_trail);
236 }
237
238 msg_size += 40;
239 if (snapc)
240 msg_size += sizeof(u64) * snapc->num_snaps;
241 if (use_mempool)
242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
243 else
244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
245 if (!msg) {
246 ceph_osdc_put_request(req);
247 return NULL;
248 }
249
250 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
251 memset(msg->front.iov_base, 0, msg->front.iov_len);
252
253 req->r_request = msg;
254 req->r_pages = pages;
255#ifdef CONFIG_BLOCK
256 if (bio) {
257 req->r_bio = bio;
258 bio_get(req->r_bio);
259 }
260#endif
261
262 return req;
263}
264EXPORT_SYMBOL(ceph_osdc_alloc_request);
265
266static void osd_req_encode_op(struct ceph_osd_request *req,
267 struct ceph_osd_op *dst,
268 struct ceph_osd_req_op *src)
269{
270 dst->op = cpu_to_le16(src->op);
271
272 switch (dst->op) {
273 case CEPH_OSD_OP_READ:
274 case CEPH_OSD_OP_WRITE:
275 dst->extent.offset =
276 cpu_to_le64(src->extent.offset);
277 dst->extent.length =
278 cpu_to_le64(src->extent.length);
279 dst->extent.truncate_size =
280 cpu_to_le64(src->extent.truncate_size);
281 dst->extent.truncate_seq =
282 cpu_to_le32(src->extent.truncate_seq);
283 break;
284
285 case CEPH_OSD_OP_GETXATTR:
286 case CEPH_OSD_OP_SETXATTR:
287 case CEPH_OSD_OP_CMPXATTR:
288 BUG_ON(!req->r_trail);
289
290 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
291 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
292 dst->xattr.cmp_op = src->xattr.cmp_op;
293 dst->xattr.cmp_mode = src->xattr.cmp_mode;
294 ceph_pagelist_append(req->r_trail, src->xattr.name,
295 src->xattr.name_len);
296 ceph_pagelist_append(req->r_trail, src->xattr.val,
297 src->xattr.value_len);
298 break;
299 case CEPH_OSD_OP_CALL:
300 BUG_ON(!req->r_trail);
301
302 dst->cls.class_len = src->cls.class_len;
303 dst->cls.method_len = src->cls.method_len;
304 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
305
306 ceph_pagelist_append(req->r_trail, src->cls.class_name,
307 src->cls.class_len);
308 ceph_pagelist_append(req->r_trail, src->cls.method_name,
309 src->cls.method_len);
310 ceph_pagelist_append(req->r_trail, src->cls.indata,
311 src->cls.indata_len);
312 break;
313 case CEPH_OSD_OP_ROLLBACK:
314 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
315 break;
316 case CEPH_OSD_OP_STARTSYNC:
317 break;
318 default:
319 pr_err("unrecognized osd opcode %d\n", dst->op);
320 WARN_ON(1);
321 break;
322 }
323 dst->payload_len = cpu_to_le32(src->payload_len);
324}
325
326
327
328
329
330void ceph_osdc_build_request(struct ceph_osd_request *req,
331 u64 off, u64 *plen,
332 struct ceph_osd_req_op *src_ops,
333 struct ceph_snap_context *snapc,
334 struct timespec *mtime,
335 const char *oid,
336 int oid_len)
337{
338 struct ceph_msg *msg = req->r_request;
339 struct ceph_osd_request_head *head;
340 struct ceph_osd_req_op *src_op;
341 struct ceph_osd_op *op;
342 void *p;
343 int num_op = get_num_ops(src_ops, NULL);
344 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
345 int flags = req->r_flags;
346 u64 data_len = 0;
347 int i;
348
349 head = msg->front.iov_base;
350 op = (void *)(head + 1);
351 p = (void *)(op + num_op);
352
353 req->r_snapc = ceph_get_snap_context(snapc);
354
355 head->client_inc = cpu_to_le32(1);
356 head->flags = cpu_to_le32(flags);
357 if (flags & CEPH_OSD_FLAG_WRITE)
358 ceph_encode_timespec(&head->mtime, mtime);
359 head->num_ops = cpu_to_le16(num_op);
360
361
362
363 head->object_len = cpu_to_le32(oid_len);
364 memcpy(p, oid, oid_len);
365 p += oid_len;
366
367 src_op = src_ops;
368 while (src_op->op) {
369 osd_req_encode_op(req, op, src_op);
370 src_op++;
371 op++;
372 }
373
374 if (req->r_trail)
375 data_len += req->r_trail->length;
376
377 if (snapc) {
378 head->snap_seq = cpu_to_le64(snapc->seq);
379 head->num_snaps = cpu_to_le32(snapc->num_snaps);
380 for (i = 0; i < snapc->num_snaps; i++) {
381 put_unaligned_le64(snapc->snaps[i], p);
382 p += sizeof(u64);
383 }
384 }
385
386 if (flags & CEPH_OSD_FLAG_WRITE) {
387 req->r_request->hdr.data_off = cpu_to_le16(off);
388 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
389 } else if (data_len) {
390 req->r_request->hdr.data_off = 0;
391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
392 }
393
394 req->r_request->page_alignment = req->r_page_alignment;
395
396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
397 msg_size = p - msg->front.iov_base;
398 msg->front.iov_len = msg_size;
399 msg->hdr.front_len = cpu_to_le32(msg_size);
400 return;
401}
402EXPORT_SYMBOL(ceph_osdc_build_request);
403
404
405
406
407
408
409
410
411
412
413
414
415struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
416 struct ceph_file_layout *layout,
417 struct ceph_vino vino,
418 u64 off, u64 *plen,
419 int opcode, int flags,
420 struct ceph_snap_context *snapc,
421 int do_sync,
422 u32 truncate_seq,
423 u64 truncate_size,
424 struct timespec *mtime,
425 bool use_mempool, int num_reply,
426 int page_align)
427{
428 struct ceph_osd_req_op ops[3];
429 struct ceph_osd_request *req;
430
431 ops[0].op = opcode;
432 ops[0].extent.truncate_seq = truncate_seq;
433 ops[0].extent.truncate_size = truncate_size;
434 ops[0].payload_len = 0;
435
436 if (do_sync) {
437 ops[1].op = CEPH_OSD_OP_STARTSYNC;
438 ops[1].payload_len = 0;
439 ops[2].op = 0;
440 } else
441 ops[1].op = 0;
442
443 req = ceph_osdc_alloc_request(osdc, flags,
444 snapc, ops,
445 use_mempool,
446 GFP_NOFS, NULL, NULL);
447 if (IS_ERR(req))
448 return req;
449
450
451 calc_layout(osdc, vino, layout, off, plen, req, ops);
452 req->r_file_layout = *layout;
453
454
455
456 req->r_page_alignment = page_align;
457
458 ceph_osdc_build_request(req, off, plen, ops,
459 snapc,
460 mtime,
461 req->r_oid, req->r_oid_len);
462
463 return req;
464}
465EXPORT_SYMBOL(ceph_osdc_new_request);
466
467
468
469
470static void __insert_request(struct ceph_osd_client *osdc,
471 struct ceph_osd_request *new)
472{
473 struct rb_node **p = &osdc->requests.rb_node;
474 struct rb_node *parent = NULL;
475 struct ceph_osd_request *req = NULL;
476
477 while (*p) {
478 parent = *p;
479 req = rb_entry(parent, struct ceph_osd_request, r_node);
480 if (new->r_tid < req->r_tid)
481 p = &(*p)->rb_left;
482 else if (new->r_tid > req->r_tid)
483 p = &(*p)->rb_right;
484 else
485 BUG();
486 }
487
488 rb_link_node(&new->r_node, parent, p);
489 rb_insert_color(&new->r_node, &osdc->requests);
490}
491
492static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
493 u64 tid)
494{
495 struct ceph_osd_request *req;
496 struct rb_node *n = osdc->requests.rb_node;
497
498 while (n) {
499 req = rb_entry(n, struct ceph_osd_request, r_node);
500 if (tid < req->r_tid)
501 n = n->rb_left;
502 else if (tid > req->r_tid)
503 n = n->rb_right;
504 else
505 return req;
506 }
507 return NULL;
508}
509
510static struct ceph_osd_request *
511__lookup_request_ge(struct ceph_osd_client *osdc,
512 u64 tid)
513{
514 struct ceph_osd_request *req;
515 struct rb_node *n = osdc->requests.rb_node;
516
517 while (n) {
518 req = rb_entry(n, struct ceph_osd_request, r_node);
519 if (tid < req->r_tid) {
520 if (!n->rb_left)
521 return req;
522 n = n->rb_left;
523 } else if (tid > req->r_tid) {
524 n = n->rb_right;
525 } else {
526 return req;
527 }
528 }
529 return NULL;
530}
531
532
533
534
535
536static void osd_reset(struct ceph_connection *con)
537{
538 struct ceph_osd *osd = con->private;
539 struct ceph_osd_client *osdc;
540
541 if (!osd)
542 return;
543 dout("osd_reset osd%d\n", osd->o_osd);
544 osdc = osd->o_osdc;
545 down_read(&osdc->map_sem);
546 kick_requests(osdc, osd);
547 up_read(&osdc->map_sem);
548}
549
550
551
552
553static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
554{
555 struct ceph_osd *osd;
556
557 osd = kzalloc(sizeof(*osd), GFP_NOFS);
558 if (!osd)
559 return NULL;
560
561 atomic_set(&osd->o_ref, 1);
562 osd->o_osdc = osdc;
563 INIT_LIST_HEAD(&osd->o_requests);
564 INIT_LIST_HEAD(&osd->o_osd_lru);
565 osd->o_incarnation = 1;
566
567 ceph_con_init(osdc->client->msgr, &osd->o_con);
568 osd->o_con.private = osd;
569 osd->o_con.ops = &osd_con_ops;
570 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
571
572 INIT_LIST_HEAD(&osd->o_keepalive_item);
573 return osd;
574}
575
576static struct ceph_osd *get_osd(struct ceph_osd *osd)
577{
578 if (atomic_inc_not_zero(&osd->o_ref)) {
579 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
580 atomic_read(&osd->o_ref));
581 return osd;
582 } else {
583 dout("get_osd %p FAIL\n", osd);
584 return NULL;
585 }
586}
587
588static void put_osd(struct ceph_osd *osd)
589{
590 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
591 atomic_read(&osd->o_ref) - 1);
592 if (atomic_dec_and_test(&osd->o_ref)) {
593 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
594
595 if (osd->o_authorizer)
596 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
597 kfree(osd);
598 }
599}
600
601
602
603
604static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
605{
606 dout("__remove_osd %p\n", osd);
607 BUG_ON(!list_empty(&osd->o_requests));
608 rb_erase(&osd->o_node, &osdc->osds);
609 list_del_init(&osd->o_osd_lru);
610 ceph_con_close(&osd->o_con);
611 put_osd(osd);
612}
613
614static void __move_osd_to_lru(struct ceph_osd_client *osdc,
615 struct ceph_osd *osd)
616{
617 dout("__move_osd_to_lru %p\n", osd);
618 BUG_ON(!list_empty(&osd->o_osd_lru));
619 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
620 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
621}
622
623static void __remove_osd_from_lru(struct ceph_osd *osd)
624{
625 dout("__remove_osd_from_lru %p\n", osd);
626 if (!list_empty(&osd->o_osd_lru))
627 list_del_init(&osd->o_osd_lru);
628}
629
630static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
631{
632 struct ceph_osd *osd, *nosd;
633
634 dout("__remove_old_osds %p\n", osdc);
635 mutex_lock(&osdc->request_mutex);
636 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
637 if (!remove_all && time_before(jiffies, osd->lru_ttl))
638 break;
639 __remove_osd(osdc, osd);
640 }
641 mutex_unlock(&osdc->request_mutex);
642}
643
644
645
646
647static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
648{
649 struct ceph_osd_request *req;
650 int ret = 0;
651
652 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
653 if (list_empty(&osd->o_requests)) {
654 __remove_osd(osdc, osd);
655 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
656 &osd->o_con.peer_addr,
657 sizeof(osd->o_con.peer_addr)) == 0 &&
658 !ceph_con_opened(&osd->o_con)) {
659 dout(" osd addr hasn't changed and connection never opened,"
660 " letting msgr retry");
661
662 list_for_each_entry(req, &osd->o_requests, r_osd_item)
663 req->r_stamp = jiffies;
664 ret = -EAGAIN;
665 } else {
666 ceph_con_close(&osd->o_con);
667 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
668 osd->o_incarnation++;
669 }
670 return ret;
671}
672
673static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
674{
675 struct rb_node **p = &osdc->osds.rb_node;
676 struct rb_node *parent = NULL;
677 struct ceph_osd *osd = NULL;
678
679 while (*p) {
680 parent = *p;
681 osd = rb_entry(parent, struct ceph_osd, o_node);
682 if (new->o_osd < osd->o_osd)
683 p = &(*p)->rb_left;
684 else if (new->o_osd > osd->o_osd)
685 p = &(*p)->rb_right;
686 else
687 BUG();
688 }
689
690 rb_link_node(&new->o_node, parent, p);
691 rb_insert_color(&new->o_node, &osdc->osds);
692}
693
694static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
695{
696 struct ceph_osd *osd;
697 struct rb_node *n = osdc->osds.rb_node;
698
699 while (n) {
700 osd = rb_entry(n, struct ceph_osd, o_node);
701 if (o < osd->o_osd)
702 n = n->rb_left;
703 else if (o > osd->o_osd)
704 n = n->rb_right;
705 else
706 return osd;
707 }
708 return NULL;
709}
710
711static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
712{
713 schedule_delayed_work(&osdc->timeout_work,
714 osdc->client->options->osd_keepalive_timeout * HZ);
715}
716
717static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
718{
719 cancel_delayed_work(&osdc->timeout_work);
720}
721
722
723
724
725
726static void register_request(struct ceph_osd_client *osdc,
727 struct ceph_osd_request *req)
728{
729 mutex_lock(&osdc->request_mutex);
730 req->r_tid = ++osdc->last_tid;
731 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
732 INIT_LIST_HEAD(&req->r_req_lru_item);
733
734 dout("register_request %p tid %lld\n", req, req->r_tid);
735 __insert_request(osdc, req);
736 ceph_osdc_get_request(req);
737 osdc->num_requests++;
738
739 if (osdc->num_requests == 1) {
740 dout(" first request, scheduling timeout\n");
741 __schedule_osd_timeout(osdc);
742 }
743 mutex_unlock(&osdc->request_mutex);
744}
745
746
747
748
749static void __unregister_request(struct ceph_osd_client *osdc,
750 struct ceph_osd_request *req)
751{
752 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
753 rb_erase(&req->r_node, &osdc->requests);
754 osdc->num_requests--;
755
756 if (req->r_osd) {
757
758 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
759
760 list_del_init(&req->r_osd_item);
761 if (list_empty(&req->r_osd->o_requests))
762 __move_osd_to_lru(osdc, req->r_osd);
763 req->r_osd = NULL;
764 }
765
766 ceph_osdc_put_request(req);
767
768 list_del_init(&req->r_req_lru_item);
769 if (osdc->num_requests == 0) {
770 dout(" no requests, canceling timeout\n");
771 __cancel_osd_timeout(osdc);
772 }
773}
774
775
776
777
778static void __cancel_request(struct ceph_osd_request *req)
779{
780 if (req->r_sent && req->r_osd) {
781 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 req->r_sent = 0;
783 }
784 list_del_init(&req->r_req_lru_item);
785}
786
787
788
789
790
791
792
793
794
795
796static int __map_osds(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req)
798{
799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
800 struct ceph_pg pgid;
801 int acting[CEPH_PG_MAX_SIZE];
802 int o = -1, num = 0;
803 int err;
804
805 dout("map_osds %p tid %lld\n", req, req->r_tid);
806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 &req->r_file_layout, osdc->osdmap);
808 if (err)
809 return err;
810 pgid = reqhead->layout.ol_pgid;
811 req->r_pgid = pgid;
812
813 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
814 if (err > 0) {
815 o = acting[0];
816 num = err;
817 }
818
819 if ((req->r_osd && req->r_osd->o_osd == o &&
820 req->r_sent >= req->r_osd->o_incarnation &&
821 req->r_num_pg_osds == num &&
822 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
823 (req->r_osd == NULL && o == -1))
824 return 0;
825
826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
828 req->r_osd ? req->r_osd->o_osd : -1);
829
830
831 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
832 req->r_num_pg_osds = num;
833
834 if (req->r_osd) {
835 __cancel_request(req);
836 list_del_init(&req->r_osd_item);
837 req->r_osd = NULL;
838 }
839
840 req->r_osd = __lookup_osd(osdc, o);
841 if (!req->r_osd && o >= 0) {
842 err = -ENOMEM;
843 req->r_osd = create_osd(osdc);
844 if (!req->r_osd)
845 goto out;
846
847 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
848 req->r_osd->o_osd = o;
849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 __insert_osd(osdc, req->r_osd);
851
852 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
853 }
854
855 if (req->r_osd) {
856 __remove_osd_from_lru(req->r_osd);
857 list_add(&req->r_osd_item, &req->r_osd->o_requests);
858 }
859 err = 1;
860
861out:
862 return err;
863}
864
865
866
867
868static int __send_request(struct ceph_osd_client *osdc,
869 struct ceph_osd_request *req)
870{
871 struct ceph_osd_request_head *reqhead;
872 int err;
873
874 err = __map_osds(osdc, req);
875 if (err < 0)
876 return err;
877 if (req->r_osd == NULL) {
878 dout("send_request %p no up osds in pg\n", req);
879 ceph_monc_request_next_osdmap(&osdc->client->monc);
880 return 0;
881 }
882
883 dout("send_request %p tid %llu to osd%d flags %d\n",
884 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
885
886 reqhead = req->r_request->front.iov_base;
887 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
888 reqhead->flags |= cpu_to_le32(req->r_flags);
889 reqhead->reassert_version = req->r_reassert_version;
890
891 req->r_stamp = jiffies;
892 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
893
894 ceph_msg_get(req->r_request);
895 ceph_con_send(&req->r_osd->o_con, req->r_request);
896 req->r_sent = req->r_osd->o_incarnation;
897 return 0;
898}
899
900
901
902
903
904
905
906
907
908
909static void handle_timeout(struct work_struct *work)
910{
911 struct ceph_osd_client *osdc =
912 container_of(work, struct ceph_osd_client, timeout_work.work);
913 struct ceph_osd_request *req, *last_req = NULL;
914 struct ceph_osd *osd;
915 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
916 unsigned long keepalive =
917 osdc->client->options->osd_keepalive_timeout * HZ;
918 unsigned long last_stamp = 0;
919 struct rb_node *p;
920 struct list_head slow_osds;
921
922 dout("timeout\n");
923 down_read(&osdc->map_sem);
924
925 ceph_monc_request_next_osdmap(&osdc->client->monc);
926
927 mutex_lock(&osdc->request_mutex);
928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 req = rb_entry(p, struct ceph_osd_request, r_node);
930
931 if (req->r_resend) {
932 int err;
933
934 dout("osdc resending prev failed %lld\n", req->r_tid);
935 err = __send_request(osdc, req);
936 if (err)
937 dout("osdc failed again on %lld\n", req->r_tid);
938 else
939 req->r_resend = false;
940 continue;
941 }
942 }
943
944
945
946
947
948
949
950
951 while (timeout && !list_empty(&osdc->req_lru)) {
952 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
953 r_req_lru_item);
954
955 if (time_before(jiffies, req->r_stamp + timeout))
956 break;
957
958 BUG_ON(req == last_req && req->r_stamp == last_stamp);
959 last_req = req;
960 last_stamp = req->r_stamp;
961
962 osd = req->r_osd;
963 BUG_ON(!osd);
964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 req->r_tid, osd->o_osd);
966 __kick_requests(osdc, osd);
967 }
968
969
970
971
972
973
974 INIT_LIST_HEAD(&slow_osds);
975 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
976 if (time_before(jiffies, req->r_stamp + keepalive))
977 break;
978
979 osd = req->r_osd;
980 BUG_ON(!osd);
981 dout(" tid %llu is slow, will send keepalive on osd%d\n",
982 req->r_tid, osd->o_osd);
983 list_move_tail(&osd->o_keepalive_item, &slow_osds);
984 }
985 while (!list_empty(&slow_osds)) {
986 osd = list_entry(slow_osds.next, struct ceph_osd,
987 o_keepalive_item);
988 list_del_init(&osd->o_keepalive_item);
989 ceph_con_keepalive(&osd->o_con);
990 }
991
992 __schedule_osd_timeout(osdc);
993 mutex_unlock(&osdc->request_mutex);
994
995 up_read(&osdc->map_sem);
996}
997
998static void handle_osds_timeout(struct work_struct *work)
999{
1000 struct ceph_osd_client *osdc =
1001 container_of(work, struct ceph_osd_client,
1002 osds_timeout_work.work);
1003 unsigned long delay =
1004 osdc->client->options->osd_idle_ttl * HZ >> 2;
1005
1006 dout("osds timeout\n");
1007 down_read(&osdc->map_sem);
1008 remove_old_osds(osdc, 0);
1009 up_read(&osdc->map_sem);
1010
1011 schedule_delayed_work(&osdc->osds_timeout_work,
1012 round_jiffies_relative(delay));
1013}
1014
1015
1016
1017
1018
1019static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1020 struct ceph_connection *con)
1021{
1022 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1023 struct ceph_osd_request *req;
1024 u64 tid;
1025 int numops, object_len, flags;
1026 s32 result;
1027
1028 tid = le64_to_cpu(msg->hdr.tid);
1029 if (msg->front.iov_len < sizeof(*rhead))
1030 goto bad;
1031 numops = le32_to_cpu(rhead->num_ops);
1032 object_len = le32_to_cpu(rhead->object_len);
1033 result = le32_to_cpu(rhead->result);
1034 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1035 numops * sizeof(struct ceph_osd_op))
1036 goto bad;
1037 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1038
1039
1040 mutex_lock(&osdc->request_mutex);
1041 req = __lookup_request(osdc, tid);
1042 if (req == NULL) {
1043 dout("handle_reply tid %llu dne\n", tid);
1044 mutex_unlock(&osdc->request_mutex);
1045 return;
1046 }
1047 ceph_osdc_get_request(req);
1048 flags = le32_to_cpu(rhead->flags);
1049
1050
1051
1052
1053
1054 if (req->r_con_filling_msg == con && req->r_reply == msg) {
1055 dout(" dropping con_filling_msg ref %p\n", con);
1056 req->r_con_filling_msg = NULL;
1057 ceph_con_put(con);
1058 }
1059
1060 if (!req->r_got_reply) {
1061 unsigned bytes;
1062
1063 req->r_result = le32_to_cpu(rhead->result);
1064 bytes = le32_to_cpu(msg->hdr.data_len);
1065 dout("handle_reply result %d bytes %d\n", req->r_result,
1066 bytes);
1067 if (req->r_result == 0)
1068 req->r_result = bytes;
1069
1070
1071 req->r_reassert_version = rhead->reassert_version;
1072
1073 req->r_got_reply = 1;
1074 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1075 dout("handle_reply tid %llu dup ack\n", tid);
1076 mutex_unlock(&osdc->request_mutex);
1077 goto done;
1078 }
1079
1080 dout("handle_reply tid %llu flags %d\n", tid, flags);
1081
1082
1083 if (result < 0 ||
1084 (flags & CEPH_OSD_FLAG_ONDISK) ||
1085 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1086 __unregister_request(osdc, req);
1087
1088 mutex_unlock(&osdc->request_mutex);
1089
1090 if (req->r_callback)
1091 req->r_callback(req, msg);
1092 else
1093 complete_all(&req->r_completion);
1094
1095 if (flags & CEPH_OSD_FLAG_ONDISK) {
1096 if (req->r_safe_callback)
1097 req->r_safe_callback(req, msg);
1098 complete_all(&req->r_safe_completion);
1099 }
1100
1101done:
1102 ceph_osdc_put_request(req);
1103 return;
1104
1105bad:
1106 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1107 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1108 (int)sizeof(*rhead));
1109 ceph_msg_dump(msg);
1110}
1111
1112
1113static int __kick_requests(struct ceph_osd_client *osdc,
1114 struct ceph_osd *kickosd)
1115{
1116 struct ceph_osd_request *req;
1117 struct rb_node *p, *n;
1118 int needmap = 0;
1119 int err;
1120
1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
1122 if (kickosd) {
1123 err = __reset_osd(osdc, kickosd);
1124 if (err == -EAGAIN)
1125 return 1;
1126 } else {
1127 for (p = rb_first(&osdc->osds); p; p = n) {
1128 struct ceph_osd *osd =
1129 rb_entry(p, struct ceph_osd, o_node);
1130
1131 n = rb_next(p);
1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133 memcmp(&osd->o_con.peer_addr,
1134 ceph_osd_addr(osdc->osdmap,
1135 osd->o_osd),
1136 sizeof(struct ceph_entity_addr)) != 0)
1137 __reset_osd(osdc, osd);
1138 }
1139 }
1140
1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 req = rb_entry(p, struct ceph_osd_request, r_node);
1143
1144 if (req->r_resend) {
1145 dout(" r_resend set on tid %llu\n", req->r_tid);
1146 __cancel_request(req);
1147 goto kick;
1148 }
1149 if (req->r_osd && kickosd == req->r_osd) {
1150 __cancel_request(req);
1151 goto kick;
1152 }
1153
1154 err = __map_osds(osdc, req);
1155 if (err == 0)
1156 continue;
1157 if (err < 0) {
1158
1159
1160
1161
1162
1163
1164 dout(" setting r_resend on %llu\n", req->r_tid);
1165 req->r_resend = true;
1166 continue;
1167 }
1168 if (req->r_osd == NULL) {
1169 dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 needmap++;
1171 continue;
1172 }
1173
1174kick:
1175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1176 req->r_osd ? req->r_osd->o_osd : -1);
1177 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1178 err = __send_request(osdc, req);
1179 if (err) {
1180 dout(" setting r_resend on %llu\n", req->r_tid);
1181 req->r_resend = true;
1182 }
1183 }
1184
1185 return needmap;
1186}
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199static void kick_requests(struct ceph_osd_client *osdc,
1200 struct ceph_osd *kickosd)
1201{
1202 int needmap;
1203
1204 mutex_lock(&osdc->request_mutex);
1205 needmap = __kick_requests(osdc, kickosd);
1206 mutex_unlock(&osdc->request_mutex);
1207
1208 if (needmap) {
1209 dout("%d requests for down osds, need new map\n", needmap);
1210 ceph_monc_request_next_osdmap(&osdc->client->monc);
1211 }
1212
1213}
1214
1215
1216
1217
1218
1219
1220
1221void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1222{
1223 void *p, *end, *next;
1224 u32 nr_maps, maplen;
1225 u32 epoch;
1226 struct ceph_osdmap *newmap = NULL, *oldmap;
1227 int err;
1228 struct ceph_fsid fsid;
1229
1230 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1231 p = msg->front.iov_base;
1232 end = p + msg->front.iov_len;
1233
1234
1235 ceph_decode_need(&p, end, sizeof(fsid), bad);
1236 ceph_decode_copy(&p, &fsid, sizeof(fsid));
1237 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1238 return;
1239
1240 down_write(&osdc->map_sem);
1241
1242
1243 ceph_decode_32_safe(&p, end, nr_maps, bad);
1244 dout(" %d inc maps\n", nr_maps);
1245 while (nr_maps > 0) {
1246 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1247 epoch = ceph_decode_32(&p);
1248 maplen = ceph_decode_32(&p);
1249 ceph_decode_need(&p, end, maplen, bad);
1250 next = p + maplen;
1251 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1252 dout("applying incremental map %u len %d\n",
1253 epoch, maplen);
1254 newmap = osdmap_apply_incremental(&p, next,
1255 osdc->osdmap,
1256 osdc->client->msgr);
1257 if (IS_ERR(newmap)) {
1258 err = PTR_ERR(newmap);
1259 goto bad;
1260 }
1261 BUG_ON(!newmap);
1262 if (newmap != osdc->osdmap) {
1263 ceph_osdmap_destroy(osdc->osdmap);
1264 osdc->osdmap = newmap;
1265 }
1266 } else {
1267 dout("ignoring incremental map %u len %d\n",
1268 epoch, maplen);
1269 }
1270 p = next;
1271 nr_maps--;
1272 }
1273 if (newmap)
1274 goto done;
1275
1276
1277 ceph_decode_32_safe(&p, end, nr_maps, bad);
1278 dout(" %d full maps\n", nr_maps);
1279 while (nr_maps) {
1280 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1281 epoch = ceph_decode_32(&p);
1282 maplen = ceph_decode_32(&p);
1283 ceph_decode_need(&p, end, maplen, bad);
1284 if (nr_maps > 1) {
1285 dout("skipping non-latest full map %u len %d\n",
1286 epoch, maplen);
1287 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1288 dout("skipping full map %u len %d, "
1289 "older than our %u\n", epoch, maplen,
1290 osdc->osdmap->epoch);
1291 } else {
1292 dout("taking full map %u len %d\n", epoch, maplen);
1293 newmap = osdmap_decode(&p, p+maplen);
1294 if (IS_ERR(newmap)) {
1295 err = PTR_ERR(newmap);
1296 goto bad;
1297 }
1298 BUG_ON(!newmap);
1299 oldmap = osdc->osdmap;
1300 osdc->osdmap = newmap;
1301 if (oldmap)
1302 ceph_osdmap_destroy(oldmap);
1303 }
1304 p += maplen;
1305 nr_maps--;
1306 }
1307
1308done:
1309 downgrade_write(&osdc->map_sem);
1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 if (newmap)
1312 kick_requests(osdc, NULL);
1313 up_read(&osdc->map_sem);
1314 wake_up_all(&osdc->client->auth_wq);
1315 return;
1316
1317bad:
1318 pr_err("osdc handle_map corrupt msg\n");
1319 ceph_msg_dump(msg);
1320 up_write(&osdc->map_sem);
1321 return;
1322}
1323
1324
1325
1326
1327int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1328 struct ceph_osd_request *req,
1329 bool nofail)
1330{
1331 int rc = 0;
1332
1333 req->r_request->pages = req->r_pages;
1334 req->r_request->nr_pages = req->r_num_pages;
1335#ifdef CONFIG_BLOCK
1336 req->r_request->bio = req->r_bio;
1337#endif
1338 req->r_request->trail = req->r_trail;
1339
1340 register_request(osdc, req);
1341
1342 down_read(&osdc->map_sem);
1343 mutex_lock(&osdc->request_mutex);
1344
1345
1346
1347
1348
1349 if (req->r_sent == 0) {
1350 rc = __send_request(osdc, req);
1351 if (rc) {
1352 if (nofail) {
1353 dout("osdc_start_request failed send, "
1354 " marking %lld\n", req->r_tid);
1355 req->r_resend = true;
1356 rc = 0;
1357 } else {
1358 __unregister_request(osdc, req);
1359 }
1360 }
1361 }
1362 mutex_unlock(&osdc->request_mutex);
1363 up_read(&osdc->map_sem);
1364 return rc;
1365}
1366EXPORT_SYMBOL(ceph_osdc_start_request);
1367
1368
1369
1370
1371int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1372 struct ceph_osd_request *req)
1373{
1374 int rc;
1375
1376 rc = wait_for_completion_interruptible(&req->r_completion);
1377 if (rc < 0) {
1378 mutex_lock(&osdc->request_mutex);
1379 __cancel_request(req);
1380 __unregister_request(osdc, req);
1381 mutex_unlock(&osdc->request_mutex);
1382 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1383 return rc;
1384 }
1385
1386 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1387 return req->r_result;
1388}
1389EXPORT_SYMBOL(ceph_osdc_wait_request);
1390
1391
1392
1393
1394void ceph_osdc_sync(struct ceph_osd_client *osdc)
1395{
1396 struct ceph_osd_request *req;
1397 u64 last_tid, next_tid = 0;
1398
1399 mutex_lock(&osdc->request_mutex);
1400 last_tid = osdc->last_tid;
1401 while (1) {
1402 req = __lookup_request_ge(osdc, next_tid);
1403 if (!req)
1404 break;
1405 if (req->r_tid > last_tid)
1406 break;
1407
1408 next_tid = req->r_tid + 1;
1409 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1410 continue;
1411
1412 ceph_osdc_get_request(req);
1413 mutex_unlock(&osdc->request_mutex);
1414 dout("sync waiting on tid %llu (last is %llu)\n",
1415 req->r_tid, last_tid);
1416 wait_for_completion(&req->r_safe_completion);
1417 mutex_lock(&osdc->request_mutex);
1418 ceph_osdc_put_request(req);
1419 }
1420 mutex_unlock(&osdc->request_mutex);
1421 dout("sync done (thru tid %llu)\n", last_tid);
1422}
1423EXPORT_SYMBOL(ceph_osdc_sync);
1424
1425
1426
1427
1428int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1429{
1430 int err;
1431
1432 dout("init\n");
1433 osdc->client = client;
1434 osdc->osdmap = NULL;
1435 init_rwsem(&osdc->map_sem);
1436 init_completion(&osdc->map_waiters);
1437 osdc->last_requested_map = 0;
1438 mutex_init(&osdc->request_mutex);
1439 osdc->last_tid = 0;
1440 osdc->osds = RB_ROOT;
1441 INIT_LIST_HEAD(&osdc->osd_lru);
1442 osdc->requests = RB_ROOT;
1443 INIT_LIST_HEAD(&osdc->req_lru);
1444 osdc->num_requests = 0;
1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1447
1448 schedule_delayed_work(&osdc->osds_timeout_work,
1449 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1450
1451 err = -ENOMEM;
1452 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1453 sizeof(struct ceph_osd_request));
1454 if (!osdc->req_mempool)
1455 goto out;
1456
1457 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1458 "osd_op");
1459 if (err < 0)
1460 goto out_mempool;
1461 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1462 OSD_OPREPLY_FRONT_LEN, 10, true,
1463 "osd_op_reply");
1464 if (err < 0)
1465 goto out_msgpool;
1466 return 0;
1467
1468out_msgpool:
1469 ceph_msgpool_destroy(&osdc->msgpool_op);
1470out_mempool:
1471 mempool_destroy(osdc->req_mempool);
1472out:
1473 return err;
1474}
1475EXPORT_SYMBOL(ceph_osdc_init);
1476
1477void ceph_osdc_stop(struct ceph_osd_client *osdc)
1478{
1479 cancel_delayed_work_sync(&osdc->timeout_work);
1480 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1481 if (osdc->osdmap) {
1482 ceph_osdmap_destroy(osdc->osdmap);
1483 osdc->osdmap = NULL;
1484 }
1485 remove_old_osds(osdc, 1);
1486 mempool_destroy(osdc->req_mempool);
1487 ceph_msgpool_destroy(&osdc->msgpool_op);
1488 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1489}
1490EXPORT_SYMBOL(ceph_osdc_stop);
1491
1492
1493
1494
1495
1496int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1497 struct ceph_vino vino, struct ceph_file_layout *layout,
1498 u64 off, u64 *plen,
1499 u32 truncate_seq, u64 truncate_size,
1500 struct page **pages, int num_pages, int page_align)
1501{
1502 struct ceph_osd_request *req;
1503 int rc = 0;
1504
1505 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1506 vino.snap, off, *plen);
1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1509 NULL, 0, truncate_seq, truncate_size, NULL,
1510 false, 1, page_align);
1511 if (!req)
1512 return -ENOMEM;
1513
1514
1515 req->r_pages = pages;
1516
1517 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1518 off, *plen, req->r_num_pages, page_align);
1519
1520 rc = ceph_osdc_start_request(osdc, req, false);
1521 if (!rc)
1522 rc = ceph_osdc_wait_request(osdc, req);
1523
1524 ceph_osdc_put_request(req);
1525 dout("readpages result %d\n", rc);
1526 return rc;
1527}
1528EXPORT_SYMBOL(ceph_osdc_readpages);
1529
1530
1531
1532
1533int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1534 struct ceph_file_layout *layout,
1535 struct ceph_snap_context *snapc,
1536 u64 off, u64 len,
1537 u32 truncate_seq, u64 truncate_size,
1538 struct timespec *mtime,
1539 struct page **pages, int num_pages,
1540 int flags, int do_sync, bool nofail)
1541{
1542 struct ceph_osd_request *req;
1543 int rc = 0;
1544 int page_align = off & ~PAGE_MASK;
1545
1546 BUG_ON(vino.snap != CEPH_NOSNAP);
1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1548 CEPH_OSD_OP_WRITE,
1549 flags | CEPH_OSD_FLAG_ONDISK |
1550 CEPH_OSD_FLAG_WRITE,
1551 snapc, do_sync,
1552 truncate_seq, truncate_size, mtime,
1553 nofail, 1, page_align);
1554 if (!req)
1555 return -ENOMEM;
1556
1557
1558 req->r_pages = pages;
1559 dout("writepages %llu~%llu (%d pages)\n", off, len,
1560 req->r_num_pages);
1561
1562 rc = ceph_osdc_start_request(osdc, req, nofail);
1563 if (!rc)
1564 rc = ceph_osdc_wait_request(osdc, req);
1565
1566 ceph_osdc_put_request(req);
1567 if (rc == 0)
1568 rc = len;
1569 dout("writepages result %d\n", rc);
1570 return rc;
1571}
1572EXPORT_SYMBOL(ceph_osdc_writepages);
1573
1574
1575
1576
1577static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1578{
1579 struct ceph_osd *osd = con->private;
1580 struct ceph_osd_client *osdc;
1581 int type = le16_to_cpu(msg->hdr.type);
1582
1583 if (!osd)
1584 goto out;
1585 osdc = osd->o_osdc;
1586
1587 switch (type) {
1588 case CEPH_MSG_OSD_MAP:
1589 ceph_osdc_handle_map(osdc, msg);
1590 break;
1591 case CEPH_MSG_OSD_OPREPLY:
1592 handle_reply(osdc, msg, con);
1593 break;
1594
1595 default:
1596 pr_err("received unknown message type %d %s\n", type,
1597 ceph_msg_type_name(type));
1598 }
1599out:
1600 ceph_msg_put(msg);
1601}
1602
1603
1604
1605
1606
1607static struct ceph_msg *get_reply(struct ceph_connection *con,
1608 struct ceph_msg_header *hdr,
1609 int *skip)
1610{
1611 struct ceph_osd *osd = con->private;
1612 struct ceph_osd_client *osdc = osd->o_osdc;
1613 struct ceph_msg *m;
1614 struct ceph_osd_request *req;
1615 int front = le32_to_cpu(hdr->front_len);
1616 int data_len = le32_to_cpu(hdr->data_len);
1617 u64 tid;
1618
1619 tid = le64_to_cpu(hdr->tid);
1620 mutex_lock(&osdc->request_mutex);
1621 req = __lookup_request(osdc, tid);
1622 if (!req) {
1623 *skip = 1;
1624 m = NULL;
1625 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1626 osd->o_osd);
1627 goto out;
1628 }
1629
1630 if (req->r_con_filling_msg) {
1631 dout("get_reply revoking msg %p from old con %p\n",
1632 req->r_reply, req->r_con_filling_msg);
1633 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1634 ceph_con_put(req->r_con_filling_msg);
1635 req->r_con_filling_msg = NULL;
1636 }
1637
1638 if (front > req->r_reply->front.iov_len) {
1639 pr_warning("get_reply front %d > preallocated %d\n",
1640 front, (int)req->r_reply->front.iov_len);
1641 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1642 if (!m)
1643 goto out;
1644 ceph_msg_put(req->r_reply);
1645 req->r_reply = m;
1646 }
1647 m = ceph_msg_get(req->r_reply);
1648
1649 if (data_len > 0) {
1650 int want = calc_pages_for(req->r_page_alignment, data_len);
1651
1652 if (unlikely(req->r_num_pages < want)) {
1653 pr_warning("tid %lld reply %d > expected %d pages\n",
1654 tid, want, m->nr_pages);
1655 *skip = 1;
1656 ceph_msg_put(m);
1657 m = NULL;
1658 goto out;
1659 }
1660 m->pages = req->r_pages;
1661 m->nr_pages = req->r_num_pages;
1662 m->page_alignment = req->r_page_alignment;
1663#ifdef CONFIG_BLOCK
1664 m->bio = req->r_bio;
1665#endif
1666 }
1667 *skip = 0;
1668 req->r_con_filling_msg = ceph_con_get(con);
1669 dout("get_reply tid %lld %p\n", tid, m);
1670
1671out:
1672 mutex_unlock(&osdc->request_mutex);
1673 return m;
1674
1675}
1676
1677static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1678 struct ceph_msg_header *hdr,
1679 int *skip)
1680{
1681 struct ceph_osd *osd = con->private;
1682 int type = le16_to_cpu(hdr->type);
1683 int front = le32_to_cpu(hdr->front_len);
1684
1685 switch (type) {
1686 case CEPH_MSG_OSD_MAP:
1687 return ceph_msg_new(type, front, GFP_NOFS);
1688 case CEPH_MSG_OSD_OPREPLY:
1689 return get_reply(con, hdr, skip);
1690 default:
1691 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1692 osd->o_osd);
1693 *skip = 1;
1694 return NULL;
1695 }
1696}
1697
1698
1699
1700
1701static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1702{
1703 struct ceph_osd *osd = con->private;
1704 if (get_osd(osd))
1705 return con;
1706 return NULL;
1707}
1708
1709static void put_osd_con(struct ceph_connection *con)
1710{
1711 struct ceph_osd *osd = con->private;
1712 put_osd(osd);
1713}
1714
1715
1716
1717
1718static int get_authorizer(struct ceph_connection *con,
1719 void **buf, int *len, int *proto,
1720 void **reply_buf, int *reply_len, int force_new)
1721{
1722 struct ceph_osd *o = con->private;
1723 struct ceph_osd_client *osdc = o->o_osdc;
1724 struct ceph_auth_client *ac = osdc->client->monc.auth;
1725 int ret = 0;
1726
1727 if (force_new && o->o_authorizer) {
1728 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1729 o->o_authorizer = NULL;
1730 }
1731 if (o->o_authorizer == NULL) {
1732 ret = ac->ops->create_authorizer(
1733 ac, CEPH_ENTITY_TYPE_OSD,
1734 &o->o_authorizer,
1735 &o->o_authorizer_buf,
1736 &o->o_authorizer_buf_len,
1737 &o->o_authorizer_reply_buf,
1738 &o->o_authorizer_reply_buf_len);
1739 if (ret)
1740 return ret;
1741 }
1742
1743 *proto = ac->protocol;
1744 *buf = o->o_authorizer_buf;
1745 *len = o->o_authorizer_buf_len;
1746 *reply_buf = o->o_authorizer_reply_buf;
1747 *reply_len = o->o_authorizer_reply_buf_len;
1748 return 0;
1749}
1750
1751
1752static int verify_authorizer_reply(struct ceph_connection *con, int len)
1753{
1754 struct ceph_osd *o = con->private;
1755 struct ceph_osd_client *osdc = o->o_osdc;
1756 struct ceph_auth_client *ac = osdc->client->monc.auth;
1757
1758 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1759}
1760
1761static int invalidate_authorizer(struct ceph_connection *con)
1762{
1763 struct ceph_osd *o = con->private;
1764 struct ceph_osd_client *osdc = o->o_osdc;
1765 struct ceph_auth_client *ac = osdc->client->monc.auth;
1766
1767 if (ac->ops->invalidate_authorizer)
1768 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1769
1770 return ceph_monc_validate_auth(&osdc->client->monc);
1771}
1772
1773static const struct ceph_connection_operations osd_con_ops = {
1774 .get = get_osd_con,
1775 .put = put_osd_con,
1776 .dispatch = dispatch,
1777 .get_authorizer = get_authorizer,
1778 .verify_authorizer_reply = verify_authorizer_reply,
1779 .invalidate_authorizer = invalidate_authorizer,
1780 .alloc_msg = alloc_msg,
1781 .fault = osd_reset,
1782};
1783