1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include <linux/libcfs/libcfs.h>
40
41
42#include <lustre_dlm.h>
43#include <lustre_net.h>
44#include <lustre/lustre_user.h>
45#include <obd_cksum.h>
46#include <obd_ost.h>
47#include <obd_lov.h>
48
49#ifdef __CYGWIN__
50# include <ctype.h>
51#endif
52
53#include <lustre_ha.h>
54#include <lprocfs_status.h>
55#include <lustre_log.h>
56#include <lustre_debug.h>
57#include <lustre_param.h>
58#include <lustre_fid.h>
59#include "osc_internal.h"
60#include "osc_cl_internal.h"
61
62static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65int osc_cleanup(struct obd_device *obd);
66
67
68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
70{
71 int lmm_size;
72
73 lmm_size = sizeof(**lmmp);
74 if (lmmp == NULL)
75 return lmm_size;
76
77 if (*lmmp != NULL && lsm == NULL) {
78 OBD_FREE(*lmmp, lmm_size);
79 *lmmp = NULL;
80 return 0;
81 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
82 return -EBADF;
83 }
84
85 if (*lmmp == NULL) {
86 OBD_ALLOC(*lmmp, lmm_size);
87 if (*lmmp == NULL)
88 return -ENOMEM;
89 }
90
91 if (lsm)
92 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
93
94 return lmm_size;
95}
96
97
98static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
99 struct lov_mds_md *lmm, int lmm_bytes)
100{
101 int lsm_size;
102 struct obd_import *imp = class_exp2cliimp(exp);
103
104 if (lmm != NULL) {
105 if (lmm_bytes < sizeof(*lmm)) {
106 CERROR("%s: lov_mds_md too small: %d, need %d\n",
107 exp->exp_obd->obd_name, lmm_bytes,
108 (int)sizeof(*lmm));
109 return -EINVAL;
110 }
111
112
113 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
114 CERROR("%s: zero lmm_object_id: rc = %d\n",
115 exp->exp_obd->obd_name, -EINVAL);
116 return -EINVAL;
117 }
118 }
119
120 lsm_size = lov_stripe_md_size(1);
121 if (lsmp == NULL)
122 return lsm_size;
123
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
127 *lsmp = NULL;
128 return 0;
129 }
130
131 if (*lsmp == NULL) {
132 OBD_ALLOC(*lsmp, lsm_size);
133 if (unlikely(*lsmp == NULL))
134 return -ENOMEM;
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
137 OBD_FREE(*lsmp, lsm_size);
138 return -ENOMEM;
139 }
140 loi_init((*lsmp)->lsm_oinfo[0]);
141 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
142 return -EBADF;
143 }
144
145 if (lmm != NULL)
146
147 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
148
149 if (imp != NULL &&
150 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
151 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
152 else
153 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155 return lsm_size;
156}
157
158static inline void osc_pack_capa(struct ptlrpc_request *req,
159 struct ost_body *body, void *capa)
160{
161 struct obd_capa *oc = (struct obd_capa *)capa;
162 struct lustre_capa *c;
163
164 if (!capa)
165 return;
166
167 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168 LASSERT(c);
169 capa_cpy(c, oc);
170 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
171 DEBUG_CAPA(D_SEC, c, "pack");
172}
173
174static inline void osc_pack_req_body(struct ptlrpc_request *req,
175 struct obd_info *oinfo)
176{
177 struct ost_body *body;
178
179 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180 LASSERT(body);
181
182 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
183 oinfo->oi_oa);
184 osc_pack_capa(req, body, oinfo->oi_capa);
185}
186
187static inline void osc_set_capa_size(struct ptlrpc_request *req,
188 const struct req_msg_field *field,
189 struct obd_capa *oc)
190{
191 if (oc == NULL)
192 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
193 else
194
195 ;
196}
197
198static int osc_getattr_interpret(const struct lu_env *env,
199 struct ptlrpc_request *req,
200 struct osc_async_args *aa, int rc)
201{
202 struct ost_body *body;
203
204 if (rc != 0)
205 GOTO(out, rc);
206
207 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
208 if (body) {
209 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
210 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
211 aa->aa_oi->oi_oa, &body->oa);
212
213
214 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
215 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
216 } else {
217 CDEBUG(D_INFO, "can't unpack ost_body\n");
218 rc = -EPROTO;
219 aa->aa_oi->oi_oa->o_valid = 0;
220 }
221out:
222 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223 return rc;
224}
225
226static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227 struct ptlrpc_request_set *set)
228{
229 struct ptlrpc_request *req;
230 struct osc_async_args *aa;
231 int rc;
232
233 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
234 if (req == NULL)
235 return -ENOMEM;
236
237 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
238 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
239 if (rc) {
240 ptlrpc_request_free(req);
241 return rc;
242 }
243
244 osc_pack_req_body(req, oinfo);
245
246 ptlrpc_request_set_replen(req);
247 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
248
249 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
250 aa = ptlrpc_req_async_args(req);
251 aa->aa_oi = oinfo;
252
253 ptlrpc_set_add_req(set, req);
254 return 0;
255}
256
257static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
258 struct obd_info *oinfo)
259{
260 struct ptlrpc_request *req;
261 struct ost_body *body;
262 int rc;
263
264 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
265 if (req == NULL)
266 return -ENOMEM;
267
268 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
269 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
270 if (rc) {
271 ptlrpc_request_free(req);
272 return rc;
273 }
274
275 osc_pack_req_body(req, oinfo);
276
277 ptlrpc_request_set_replen(req);
278
279 rc = ptlrpc_queue_wait(req);
280 if (rc)
281 GOTO(out, rc);
282
283 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284 if (body == NULL)
285 GOTO(out, rc = -EPROTO);
286
287 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
288 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
289 &body->oa);
290
291 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
292 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
293
294 out:
295 ptlrpc_req_finished(req);
296 return rc;
297}
298
299static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
300 struct obd_info *oinfo, struct obd_trans_info *oti)
301{
302 struct ptlrpc_request *req;
303 struct ost_body *body;
304 int rc;
305
306 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
307
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
309 if (req == NULL)
310 return -ENOMEM;
311
312 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
313 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
314 if (rc) {
315 ptlrpc_request_free(req);
316 return rc;
317 }
318
319 osc_pack_req_body(req, oinfo);
320
321 ptlrpc_request_set_replen(req);
322
323 rc = ptlrpc_queue_wait(req);
324 if (rc)
325 GOTO(out, rc);
326
327 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
328 if (body == NULL)
329 GOTO(out, rc = -EPROTO);
330
331 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
332 &body->oa);
333
334out:
335 ptlrpc_req_finished(req);
336 return rc;
337}
338
339static int osc_setattr_interpret(const struct lu_env *env,
340 struct ptlrpc_request *req,
341 struct osc_setattr_args *sa, int rc)
342{
343 struct ost_body *body;
344
345 if (rc != 0)
346 GOTO(out, rc);
347
348 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
349 if (body == NULL)
350 GOTO(out, rc = -EPROTO);
351
352 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
353 &body->oa);
354out:
355 rc = sa->sa_upcall(sa->sa_cookie, rc);
356 return rc;
357}
358
359int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
360 struct obd_trans_info *oti,
361 obd_enqueue_update_f upcall, void *cookie,
362 struct ptlrpc_request_set *rqset)
363{
364 struct ptlrpc_request *req;
365 struct osc_setattr_args *sa;
366 int rc;
367
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
369 if (req == NULL)
370 return -ENOMEM;
371
372 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
373 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
374 if (rc) {
375 ptlrpc_request_free(req);
376 return rc;
377 }
378
379 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
380 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
381
382 osc_pack_req_body(req, oinfo);
383
384 ptlrpc_request_set_replen(req);
385
386
387 if (!rqset) {
388
389 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
390 } else {
391 req->rq_interpret_reply =
392 (ptlrpc_interpterer_t)osc_setattr_interpret;
393
394 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
395 sa = ptlrpc_req_async_args(req);
396 sa->sa_oa = oinfo->oi_oa;
397 sa->sa_upcall = upcall;
398 sa->sa_cookie = cookie;
399
400 if (rqset == PTLRPCD_SET)
401 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402 else
403 ptlrpc_set_add_req(rqset, req);
404 }
405
406 return 0;
407}
408
409static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
410 struct obd_trans_info *oti,
411 struct ptlrpc_request_set *rqset)
412{
413 return osc_setattr_async_base(exp, oinfo, oti,
414 oinfo->oi_cb_up, oinfo, rqset);
415}
416
417int osc_real_create(struct obd_export *exp, struct obdo *oa,
418 struct lov_stripe_md **ea, struct obd_trans_info *oti)
419{
420 struct ptlrpc_request *req;
421 struct ost_body *body;
422 struct lov_stripe_md *lsm;
423 int rc;
424
425 LASSERT(oa);
426 LASSERT(ea);
427
428 lsm = *ea;
429 if (!lsm) {
430 rc = obd_alloc_memmd(exp, &lsm);
431 if (rc < 0)
432 return rc;
433 }
434
435 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436 if (req == NULL)
437 GOTO(out, rc = -ENOMEM);
438
439 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440 if (rc) {
441 ptlrpc_request_free(req);
442 GOTO(out, rc);
443 }
444
445 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446 LASSERT(body);
447
448 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
449
450 ptlrpc_request_set_replen(req);
451
452 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
453 oa->o_flags == OBD_FL_DELORPHAN) {
454 DEBUG_REQ(D_HA, req,
455 "delorphan from OST integration");
456
457 req->rq_no_resend = req->rq_no_delay = 1;
458 }
459
460 rc = ptlrpc_queue_wait(req);
461 if (rc)
462 GOTO(out_req, rc);
463
464 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
465 if (body == NULL)
466 GOTO(out_req, rc = -EPROTO);
467
468 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
469 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
470
471 oa->o_blksize = cli_brw_size(exp->exp_obd);
472 oa->o_valid |= OBD_MD_FLBLKSZ;
473
474
475
476
477
478 lsm->lsm_oi = oa->o_oi;
479 *ea = lsm;
480
481 if (oti != NULL) {
482 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483
484 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485 if (!oti->oti_logcookies)
486 oti_alloc_cookies(oti, 1);
487 *oti->oti_logcookies = oa->o_lcookie;
488 }
489 }
490
491 CDEBUG(D_HA, "transno: "LPD64"\n",
492 lustre_msg_get_transno(req->rq_repmsg));
493out_req:
494 ptlrpc_req_finished(req);
495out:
496 if (rc && !*ea)
497 obd_free_memmd(exp, &lsm);
498 return rc;
499}
500
501int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
502 obd_enqueue_update_f upcall, void *cookie,
503 struct ptlrpc_request_set *rqset)
504{
505 struct ptlrpc_request *req;
506 struct osc_setattr_args *sa;
507 struct ost_body *body;
508 int rc;
509
510 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
511 if (req == NULL)
512 return -ENOMEM;
513
514 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
515 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
516 if (rc) {
517 ptlrpc_request_free(req);
518 return rc;
519 }
520 req->rq_request_portal = OST_IO_PORTAL;
521 ptlrpc_at_set_req_timeout(req);
522
523 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
524 LASSERT(body);
525 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
526 oinfo->oi_oa);
527 osc_pack_capa(req, body, oinfo->oi_capa);
528
529 ptlrpc_request_set_replen(req);
530
531 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
532 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
533 sa = ptlrpc_req_async_args(req);
534 sa->sa_oa = oinfo->oi_oa;
535 sa->sa_upcall = upcall;
536 sa->sa_cookie = cookie;
537 if (rqset == PTLRPCD_SET)
538 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
539 else
540 ptlrpc_set_add_req(rqset, req);
541
542 return 0;
543}
544
545static int osc_punch(const struct lu_env *env, struct obd_export *exp,
546 struct obd_info *oinfo, struct obd_trans_info *oti,
547 struct ptlrpc_request_set *rqset)
548{
549 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
550 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
551 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
552 return osc_punch_base(exp, oinfo,
553 oinfo->oi_cb_up, oinfo, rqset);
554}
555
556static int osc_sync_interpret(const struct lu_env *env,
557 struct ptlrpc_request *req,
558 void *arg, int rc)
559{
560 struct osc_fsync_args *fa = arg;
561 struct ost_body *body;
562
563 if (rc)
564 GOTO(out, rc);
565
566 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
567 if (body == NULL) {
568 CERROR ("can't unpack ost_body\n");
569 GOTO(out, rc = -EPROTO);
570 }
571
572 *fa->fa_oi->oi_oa = body->oa;
573out:
574 rc = fa->fa_upcall(fa->fa_cookie, rc);
575 return rc;
576}
577
578int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
579 obd_enqueue_update_f upcall, void *cookie,
580 struct ptlrpc_request_set *rqset)
581{
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct osc_fsync_args *fa;
585 int rc;
586
587 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
588 if (req == NULL)
589 return -ENOMEM;
590
591 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
592 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
593 if (rc) {
594 ptlrpc_request_free(req);
595 return rc;
596 }
597
598
599 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
600 LASSERT(body);
601 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
602 oinfo->oi_oa);
603 osc_pack_capa(req, body, oinfo->oi_capa);
604
605 ptlrpc_request_set_replen(req);
606 req->rq_interpret_reply = osc_sync_interpret;
607
608 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
609 fa = ptlrpc_req_async_args(req);
610 fa->fa_oi = oinfo;
611 fa->fa_upcall = upcall;
612 fa->fa_cookie = cookie;
613
614 if (rqset == PTLRPCD_SET)
615 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
616 else
617 ptlrpc_set_add_req(rqset, req);
618
619 return 0;
620}
621
622static int osc_sync(const struct lu_env *env, struct obd_export *exp,
623 struct obd_info *oinfo, obd_size start, obd_size end,
624 struct ptlrpc_request_set *set)
625{
626 if (!oinfo->oi_oa) {
627 CDEBUG(D_INFO, "oa NULL\n");
628 return -EINVAL;
629 }
630
631 oinfo->oi_oa->o_size = start;
632 oinfo->oi_oa->o_blocks = end;
633 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
634
635 return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
636}
637
638
639
640
641static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
642 struct list_head *cancels,
643 ldlm_mode_t mode, int lock_flags)
644{
645 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
646 struct ldlm_res_id res_id;
647 struct ldlm_resource *res;
648 int count;
649
650
651
652
653
654
655
656 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
657 return 0;
658
659 ostid_build_res_name(&oa->o_oi, &res_id);
660 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661 if (res == NULL)
662 return 0;
663
664 LDLM_RESOURCE_ADDREF(res);
665 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
666 lock_flags, 0, NULL);
667 LDLM_RESOURCE_DELREF(res);
668 ldlm_resource_putref(res);
669 return count;
670}
671
672static int osc_destroy_interpret(const struct lu_env *env,
673 struct ptlrpc_request *req, void *data,
674 int rc)
675{
676 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
677
678 atomic_dec(&cli->cl_destroy_in_flight);
679 wake_up(&cli->cl_destroy_waitq);
680 return 0;
681}
682
683static int osc_can_send_destroy(struct client_obd *cli)
684{
685 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
686 cli->cl_max_rpcs_in_flight) {
687
688 return 1;
689 }
690 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
691 cli->cl_max_rpcs_in_flight) {
692
693
694
695
696 wake_up(&cli->cl_destroy_waitq);
697 }
698 return 0;
699}
700
701int osc_create(const struct lu_env *env, struct obd_export *exp,
702 struct obdo *oa, struct lov_stripe_md **ea,
703 struct obd_trans_info *oti)
704{
705 int rc = 0;
706
707 LASSERT(oa);
708 LASSERT(ea);
709 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
710
711 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
712 oa->o_flags == OBD_FL_RECREATE_OBJS) {
713 return osc_real_create(exp, oa, ea, oti);
714 }
715
716 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
717 return osc_real_create(exp, oa, ea, oti);
718
719
720 LBUG();
721
722 return rc;
723}
724
725
726
727
728
729
730
731
732
733
734
735static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
736 struct obdo *oa, struct lov_stripe_md *ea,
737 struct obd_trans_info *oti, struct obd_export *md_export,
738 void *capa)
739{
740 struct client_obd *cli = &exp->exp_obd->u.cli;
741 struct ptlrpc_request *req;
742 struct ost_body *body;
743 LIST_HEAD(cancels);
744 int rc, count;
745
746 if (!oa) {
747 CDEBUG(D_INFO, "oa NULL\n");
748 return -EINVAL;
749 }
750
751 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
752 LDLM_FL_DISCARD_DATA);
753
754 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
755 if (req == NULL) {
756 ldlm_lock_list_put(&cancels, l_bl_ast, count);
757 return -ENOMEM;
758 }
759
760 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
761 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
762 0, &cancels, count);
763 if (rc) {
764 ptlrpc_request_free(req);
765 return rc;
766 }
767
768 req->rq_request_portal = OST_IO_PORTAL;
769 ptlrpc_at_set_req_timeout(req);
770
771 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
772 oa->o_lcookie = *oti->oti_logcookies;
773 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
774 LASSERT(body);
775 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
776
777 osc_pack_capa(req, body, (struct obd_capa *)capa);
778 ptlrpc_request_set_replen(req);
779
780
781
782
783
784 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
785 req->rq_interpret_reply = osc_destroy_interpret;
786 if (!osc_can_send_destroy(cli)) {
787 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
788 NULL);
789
790
791
792
793
794 l_wait_event_exclusive(cli->cl_destroy_waitq,
795 osc_can_send_destroy(cli), &lwi);
796 }
797 }
798
799
800 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
801 return 0;
802}
803
804static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
805 long writing_bytes)
806{
807 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
808
809 LASSERT(!(oa->o_valid & bits));
810
811 oa->o_valid |= bits;
812 client_obd_list_lock(&cli->cl_loi_list_lock);
813 oa->o_dirty = cli->cl_dirty;
814 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
815 cli->cl_dirty_max)) {
816 CERROR("dirty %lu - %lu > dirty_max %lu\n",
817 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
818 oa->o_undirty = 0;
819 } else if (unlikely(atomic_read(&obd_dirty_pages) -
820 atomic_read(&obd_dirty_transit_pages) >
821 (long)(obd_max_dirty_pages + 1))) {
822
823
824
825 CERROR("dirty %d - %d > system dirty_max %d\n",
826 atomic_read(&obd_dirty_pages),
827 atomic_read(&obd_dirty_transit_pages),
828 obd_max_dirty_pages);
829 oa->o_undirty = 0;
830 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
831 CERROR("dirty %lu - dirty_max %lu too big???\n",
832 cli->cl_dirty, cli->cl_dirty_max);
833 oa->o_undirty = 0;
834 } else {
835 long max_in_flight = (cli->cl_max_pages_per_rpc <<
836 PAGE_CACHE_SHIFT)*
837 (cli->cl_max_rpcs_in_flight + 1);
838 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
839 }
840 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
841 oa->o_dropped = cli->cl_lost_grant;
842 cli->cl_lost_grant = 0;
843 client_obd_list_unlock(&cli->cl_loi_list_lock);
844 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
845 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
846
847}
848
849void osc_update_next_shrink(struct client_obd *cli)
850{
851 cli->cl_next_shrink_grant =
852 cfs_time_shift(cli->cl_grant_shrink_interval);
853 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
854 cli->cl_next_shrink_grant);
855}
856
857static void __osc_update_grant(struct client_obd *cli, obd_size grant)
858{
859 client_obd_list_lock(&cli->cl_loi_list_lock);
860 cli->cl_avail_grant += grant;
861 client_obd_list_unlock(&cli->cl_loi_list_lock);
862}
863
864static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
865{
866 if (body->oa.o_valid & OBD_MD_FLGRANT) {
867 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
868 __osc_update_grant(cli, body->oa.o_grant);
869 }
870}
871
872static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
873 obd_count keylen, void *key, obd_count vallen,
874 void *val, struct ptlrpc_request_set *set);
875
876static int osc_shrink_grant_interpret(const struct lu_env *env,
877 struct ptlrpc_request *req,
878 void *aa, int rc)
879{
880 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
881 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
882 struct ost_body *body;
883
884 if (rc != 0) {
885 __osc_update_grant(cli, oa->o_grant);
886 GOTO(out, rc);
887 }
888
889 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
890 LASSERT(body);
891 osc_update_grant(cli, body);
892out:
893 OBDO_FREE(oa);
894 return rc;
895}
896
897static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
898{
899 client_obd_list_lock(&cli->cl_loi_list_lock);
900 oa->o_grant = cli->cl_avail_grant / 4;
901 cli->cl_avail_grant -= oa->o_grant;
902 client_obd_list_unlock(&cli->cl_loi_list_lock);
903 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
904 oa->o_valid |= OBD_MD_FLFLAGS;
905 oa->o_flags = 0;
906 }
907 oa->o_flags |= OBD_FL_SHRINK_GRANT;
908 osc_update_next_shrink(cli);
909}
910
911
912
913
914
915static int osc_shrink_grant(struct client_obd *cli)
916{
917 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
918 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
919
920 client_obd_list_lock(&cli->cl_loi_list_lock);
921 if (cli->cl_avail_grant <= target_bytes)
922 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925 return osc_shrink_grant_to_target(cli, target_bytes);
926}
927
928int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
929{
930 int rc = 0;
931 struct ost_body *body;
932
933 client_obd_list_lock(&cli->cl_loi_list_lock);
934
935
936
937 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
938 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
939
940 if (target_bytes >= cli->cl_avail_grant) {
941 client_obd_list_unlock(&cli->cl_loi_list_lock);
942 return 0;
943 }
944 client_obd_list_unlock(&cli->cl_loi_list_lock);
945
946 OBD_ALLOC_PTR(body);
947 if (!body)
948 return -ENOMEM;
949
950 osc_announce_cached(cli, &body->oa, 0);
951
952 client_obd_list_lock(&cli->cl_loi_list_lock);
953 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
954 cli->cl_avail_grant = target_bytes;
955 client_obd_list_unlock(&cli->cl_loi_list_lock);
956 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
957 body->oa.o_valid |= OBD_MD_FLFLAGS;
958 body->oa.o_flags = 0;
959 }
960 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
961 osc_update_next_shrink(cli);
962
963 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
964 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
965 sizeof(*body), body, NULL);
966 if (rc != 0)
967 __osc_update_grant(cli, body->oa.o_grant);
968 OBD_FREE_PTR(body);
969 return rc;
970}
971
972static int osc_should_shrink_grant(struct client_obd *client)
973{
974 cfs_time_t time = cfs_time_current();
975 cfs_time_t next_shrink = client->cl_next_shrink_grant;
976
977 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
978 OBD_CONNECT_GRANT_SHRINK) == 0)
979 return 0;
980
981 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
982
983
984
985 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
986
987 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
988 client->cl_avail_grant > brw_size)
989 return 1;
990 else
991 osc_update_next_shrink(client);
992 }
993 return 0;
994}
995
996static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
997{
998 struct client_obd *client;
999
1000 list_for_each_entry(client, &item->ti_obd_list,
1001 cl_grant_shrink_list) {
1002 if (osc_should_shrink_grant(client))
1003 osc_shrink_grant(client);
1004 }
1005 return 0;
1006}
1007
1008static int osc_add_shrink_grant(struct client_obd *client)
1009{
1010 int rc;
1011
1012 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1013 TIMEOUT_GRANT,
1014 osc_grant_shrink_grant_cb, NULL,
1015 &client->cl_grant_shrink_list);
1016 if (rc) {
1017 CERROR("add grant client %s error %d\n",
1018 client->cl_import->imp_obd->obd_name, rc);
1019 return rc;
1020 }
1021 CDEBUG(D_CACHE, "add grant client %s \n",
1022 client->cl_import->imp_obd->obd_name);
1023 osc_update_next_shrink(client);
1024 return 0;
1025}
1026
1027static int osc_del_shrink_grant(struct client_obd *client)
1028{
1029 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1030 TIMEOUT_GRANT);
1031}
1032
1033static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1034{
1035
1036
1037
1038
1039
1040
1041
1042
1043 client_obd_list_lock(&cli->cl_loi_list_lock);
1044 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1045 cli->cl_avail_grant = ocd->ocd_grant;
1046 else
1047 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1048
1049 if (cli->cl_avail_grant < 0) {
1050 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052 ocd->ocd_grant, cli->cl_dirty);
1053
1054
1055 cli->cl_avail_grant = ocd->ocd_grant;
1056 }
1057
1058
1059 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060 client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067 list_empty(&cli->cl_grant_shrink_list))
1068 osc_add_shrink_grant(cli);
1069}
1070
1071
1072
1073
1074
1075static void handle_short_read(int nob_read, obd_count page_count,
1076 struct brw_page **pga)
1077{
1078 char *ptr;
1079 int i = 0;
1080
1081
1082 while (nob_read > 0) {
1083 LASSERT (page_count > 0);
1084
1085 if (pga[i]->count > nob_read) {
1086
1087 ptr = kmap(pga[i]->pg) +
1088 (pga[i]->off & ~CFS_PAGE_MASK);
1089 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090 kunmap(pga[i]->pg);
1091 page_count--;
1092 i++;
1093 break;
1094 }
1095
1096 nob_read -= pga[i]->count;
1097 page_count--;
1098 i++;
1099 }
1100
1101
1102 while (page_count-- > 0) {
1103 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104 memset(ptr, 0, pga[i]->count);
1105 kunmap(pga[i]->pg);
1106 i++;
1107 }
1108}
1109
1110static int check_write_rcs(struct ptlrpc_request *req,
1111 int requested_nob, int niocount,
1112 obd_count page_count, struct brw_page **pga)
1113{
1114 int i;
1115 __u32 *remote_rcs;
1116
1117 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118 sizeof(*remote_rcs) *
1119 niocount);
1120 if (remote_rcs == NULL) {
1121 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122 return(-EPROTO);
1123 }
1124
1125
1126 for (i = 0; i < niocount; i++) {
1127 if ((int)remote_rcs[i] < 0)
1128 return(remote_rcs[i]);
1129
1130 if (remote_rcs[i] != 0) {
1131 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132 i, remote_rcs[i], req);
1133 return(-EPROTO);
1134 }
1135 }
1136
1137 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139 req->rq_bulk->bd_nob_transferred, requested_nob);
1140 return(-EPROTO);
1141 }
1142
1143 return (0);
1144}
1145
1146static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147{
1148 if (p1->flag != p2->flag) {
1149 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1150 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1151
1152
1153
1154 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1155 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1156 "report this at http://bugs.whamcloud.com/\n",
1157 p1->flag, p2->flag);
1158 }
1159 return 0;
1160 }
1161
1162 return (p1->off + p1->count == p2->off);
1163}
1164
1165static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1166 struct brw_page **pga, int opc,
1167 cksum_type_t cksum_type)
1168{
1169 __u32 cksum;
1170 int i = 0;
1171 struct cfs_crypto_hash_desc *hdesc;
1172 unsigned int bufsize;
1173 int err;
1174 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1175
1176 LASSERT(pg_count > 0);
1177
1178 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1179 if (IS_ERR(hdesc)) {
1180 CERROR("Unable to initialize checksum hash %s\n",
1181 cfs_crypto_hash_name(cfs_alg));
1182 return PTR_ERR(hdesc);
1183 }
1184
1185 while (nob > 0 && pg_count > 0) {
1186 int count = pga[i]->count > nob ? nob : pga[i]->count;
1187
1188
1189
1190 if (i == 0 && opc == OST_READ &&
1191 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1192 unsigned char *ptr = kmap(pga[i]->pg);
1193 int off = pga[i]->off & ~CFS_PAGE_MASK;
1194 memcpy(ptr + off, "bad1", min(4, nob));
1195 kunmap(pga[i]->pg);
1196 }
1197 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1198 pga[i]->off & ~CFS_PAGE_MASK,
1199 count);
1200 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1201 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1202
1203 nob -= pga[i]->count;
1204 pg_count--;
1205 i++;
1206 }
1207
1208 bufsize = 4;
1209 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1210
1211 if (err)
1212 cfs_crypto_hash_final(hdesc, NULL, NULL);
1213
1214
1215
1216 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1217 cksum++;
1218
1219 return cksum;
1220}
1221
1222static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1223 struct lov_stripe_md *lsm, obd_count page_count,
1224 struct brw_page **pga,
1225 struct ptlrpc_request **reqp,
1226 struct obd_capa *ocapa, int reserve,
1227 int resend)
1228{
1229 struct ptlrpc_request *req;
1230 struct ptlrpc_bulk_desc *desc;
1231 struct ost_body *body;
1232 struct obd_ioobj *ioobj;
1233 struct niobuf_remote *niobuf;
1234 int niocount, i, requested_nob, opc, rc;
1235 struct osc_brw_async_args *aa;
1236 struct req_capsule *pill;
1237 struct brw_page *pg_prev;
1238
1239 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240 return -ENOMEM;
1241 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242 return -EINVAL;
1243
1244 if ((cmd & OBD_BRW_WRITE) != 0) {
1245 opc = OST_WRITE;
1246 req = ptlrpc_request_alloc_pool(cli->cl_import,
1247 cli->cl_import->imp_rq_pool,
1248 &RQF_OST_BRW_WRITE);
1249 } else {
1250 opc = OST_READ;
1251 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252 }
1253 if (req == NULL)
1254 return -ENOMEM;
1255
1256 for (niocount = i = 1; i < page_count; i++) {
1257 if (!can_merge_pages(pga[i - 1], pga[i]))
1258 niocount++;
1259 }
1260
1261 pill = &req->rq_pill;
1262 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263 sizeof(*ioobj));
1264 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265 niocount * sizeof(*niobuf));
1266 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269 if (rc) {
1270 ptlrpc_request_free(req);
1271 return rc;
1272 }
1273 req->rq_request_portal = OST_IO_PORTAL;
1274 ptlrpc_at_set_req_timeout(req);
1275
1276
1277 req->rq_no_retry_einprogress = 1;
1278
1279 desc = ptlrpc_prep_bulk_imp(req, page_count,
1280 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282 OST_BULK_PORTAL);
1283
1284 if (desc == NULL)
1285 GOTO(out, rc = -ENOMEM);
1286
1287
1288 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295 obdo_to_ioobj(oa, ioobj);
1296 ioobj->ioo_bufcnt = niocount;
1297
1298
1299
1300
1301
1302 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303 osc_pack_capa(req, body, ocapa);
1304 LASSERT(page_count > 0);
1305 pg_prev = pga[0];
1306 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307 struct brw_page *pg = pga[i];
1308 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310 LASSERT(pg->count > 0);
1311
1312 LASSERTF(page_count == 1 ||
1313 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314 ergo(i > 0 && i < page_count - 1,
1315 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1316 ergo(i == page_count - 1, poff == 0)),
1317 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318 i, page_count, pg, pg->off, pg->count);
1319 LASSERTF(i == 0 || pg->off > pg_prev->off,
1320 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322 i, page_count,
1323 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324 pg_prev->pg, page_private(pg_prev->pg),
1325 pg_prev->pg->index, pg_prev->off);
1326 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327 (pg->flag & OBD_BRW_SRVLOCK));
1328
1329 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330 requested_nob += pg->count;
1331
1332 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333 niobuf--;
1334 niobuf->len += pg->count;
1335 } else {
1336 niobuf->offset = pg->off;
1337 niobuf->len = pg->count;
1338 niobuf->flags = pg->flag;
1339 }
1340 pg_prev = pg;
1341 }
1342
1343 LASSERTF((void *)(niobuf - niocount) ==
1344 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349 if (resend) {
1350 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351 body->oa.o_valid |= OBD_MD_FLFLAGS;
1352 body->oa.o_flags = 0;
1353 }
1354 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355 }
1356
1357 if (osc_should_shrink_grant(cli))
1358 osc_shrink_grant_local(cli, &body->oa);
1359
1360
1361 if (opc == OST_WRITE) {
1362 if (cli->cl_checksum &&
1363 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364
1365
1366 cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369 oa->o_flags &= OBD_FL_LOCAL_MASK;
1370 body->oa.o_flags = 0;
1371 }
1372 body->oa.o_flags |= cksum_type_pack(cksum_type);
1373 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375 page_count, pga,
1376 OST_WRITE,
1377 cksum_type);
1378 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379 body->oa.o_cksum);
1380
1381 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382 oa->o_flags |= cksum_type_pack(cksum_type);
1383 } else {
1384
1385
1386 oa->o_valid &= ~OBD_MD_FLCKSUM;
1387 }
1388 oa->o_cksum = body->oa.o_cksum;
1389
1390 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391 sizeof(__u32) * niocount);
1392 } else {
1393 if (cli->cl_checksum &&
1394 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396 body->oa.o_flags = 0;
1397 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399 }
1400 }
1401 ptlrpc_request_set_replen(req);
1402
1403 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404 aa = ptlrpc_req_async_args(req);
1405 aa->aa_oa = oa;
1406 aa->aa_requested_nob = requested_nob;
1407 aa->aa_nio_count = niocount;
1408 aa->aa_page_count = page_count;
1409 aa->aa_resends = 0;
1410 aa->aa_ppga = pga;
1411 aa->aa_cli = cli;
1412 INIT_LIST_HEAD(&aa->aa_oaps);
1413 if (ocapa && reserve)
1414 aa->aa_ocapa = capa_get(ocapa);
1415
1416 *reqp = req;
1417 return 0;
1418
1419 out:
1420 ptlrpc_req_finished(req);
1421 return rc;
1422}
1423
1424static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1425 __u32 client_cksum, __u32 server_cksum, int nob,
1426 obd_count page_count, struct brw_page **pga,
1427 cksum_type_t client_cksum_type)
1428{
1429 __u32 new_cksum;
1430 char *msg;
1431 cksum_type_t cksum_type;
1432
1433 if (server_cksum == client_cksum) {
1434 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1435 return 0;
1436 }
1437
1438 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1439 oa->o_flags : 0);
1440 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1441 cksum_type);
1442
1443 if (cksum_type != client_cksum_type)
1444 msg = "the server did not use the checksum type specified in "
1445 "the original request - likely a protocol problem";
1446 else if (new_cksum == server_cksum)
1447 msg = "changed on the client after we checksummed it - "
1448 "likely false positive due to mmap IO (bug 11742)";
1449 else if (new_cksum == client_cksum)
1450 msg = "changed in transit before arrival at OST";
1451 else
1452 msg = "changed in transit AND doesn't match the original - "
1453 "likely false positive due to mmap IO (bug 11742)";
1454
1455 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1456 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1457 msg, libcfs_nid2str(peer->nid),
1458 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1459 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1460 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1461 POSTID(&oa->o_oi), pga[0]->off,
1462 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464 "client csum now %x\n", client_cksum, client_cksum_type,
1465 server_cksum, cksum_type, new_cksum);
1466 return 1;
1467}
1468
1469
1470static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471{
1472 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473 const lnet_process_id_t *peer =
1474 &req->rq_import->imp_connection->c_peer;
1475 struct client_obd *cli = aa->aa_cli;
1476 struct ost_body *body;
1477 __u32 client_cksum = 0;
1478
1479 if (rc < 0 && rc != -EDQUOT) {
1480 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1481 return rc;
1482 }
1483
1484 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1485 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1486 if (body == NULL) {
1487 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1488 return -EPROTO;
1489 }
1490
1491
1492 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1493 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1494 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1495
1496 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1497 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1498 body->oa.o_flags);
1499 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1500 }
1501
1502 osc_update_grant(cli, body);
1503
1504 if (rc < 0)
1505 return rc;
1506
1507 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1508 client_cksum = aa->aa_oa->o_cksum;
1509
1510 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1511 if (rc > 0) {
1512 CERROR("Unexpected +ve rc %d\n", rc);
1513 return -EPROTO;
1514 }
1515 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1516
1517 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1518 return -EAGAIN;
1519
1520 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1521 check_write_checksum(&body->oa, peer, client_cksum,
1522 body->oa.o_cksum, aa->aa_requested_nob,
1523 aa->aa_page_count, aa->aa_ppga,
1524 cksum_type_unpack(aa->aa_oa->o_flags)))
1525 return -EAGAIN;
1526
1527 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1528 aa->aa_page_count, aa->aa_ppga);
1529 GOTO(out, rc);
1530 }
1531
1532
1533
1534
1535 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536 if (rc < 0)
1537 GOTO(out, rc = -EAGAIN);
1538
1539 if (rc > aa->aa_requested_nob) {
1540 CERROR("Unexpected rc %d (%d requested)\n", rc,
1541 aa->aa_requested_nob);
1542 return -EPROTO;
1543 }
1544
1545 if (rc != req->rq_bulk->bd_nob_transferred) {
1546 CERROR ("Unexpected rc %d (%d transferred)\n",
1547 rc, req->rq_bulk->bd_nob_transferred);
1548 return (-EPROTO);
1549 }
1550
1551 if (rc < aa->aa_requested_nob)
1552 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1553
1554 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1555 static int cksum_counter;
1556 __u32 server_cksum = body->oa.o_cksum;
1557 char *via;
1558 char *router;
1559 cksum_type_t cksum_type;
1560
1561 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1562 body->oa.o_flags : 0);
1563 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1564 aa->aa_ppga, OST_READ,
1565 cksum_type);
1566
1567 if (peer->nid == req->rq_bulk->bd_sender) {
1568 via = router = "";
1569 } else {
1570 via = " via ";
1571 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1572 }
1573
1574 if (server_cksum == ~0 && rc > 0) {
1575 CERROR("Protocol error: server %s set the 'checksum' "
1576 "bit, but didn't send a checksum. Not fatal, "
1577 "but please notify on http://bugs.whamcloud.com/\n",
1578 libcfs_nid2str(peer->nid));
1579 } else if (server_cksum != client_cksum) {
1580 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1581 "%s%s%s inode "DFID" object "DOSTID
1582 " extent ["LPU64"-"LPU64"]\n",
1583 req->rq_import->imp_obd->obd_name,
1584 libcfs_nid2str(peer->nid),
1585 via, router,
1586 body->oa.o_valid & OBD_MD_FLFID ?
1587 body->oa.o_parent_seq : (__u64)0,
1588 body->oa.o_valid & OBD_MD_FLFID ?
1589 body->oa.o_parent_oid : 0,
1590 body->oa.o_valid & OBD_MD_FLFID ?
1591 body->oa.o_parent_ver : 0,
1592 POSTID(&body->oa.o_oi),
1593 aa->aa_ppga[0]->off,
1594 aa->aa_ppga[aa->aa_page_count-1]->off +
1595 aa->aa_ppga[aa->aa_page_count-1]->count -
1596 1);
1597 CERROR("client %x, server %x, cksum_type %x\n",
1598 client_cksum, server_cksum, cksum_type);
1599 cksum_counter = 0;
1600 aa->aa_oa->o_cksum = client_cksum;
1601 rc = -EAGAIN;
1602 } else {
1603 cksum_counter++;
1604 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605 rc = 0;
1606 }
1607 } else if (unlikely(client_cksum)) {
1608 static int cksum_missed;
1609
1610 cksum_missed++;
1611 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612 CERROR("Checksum %u requested from %s but not sent\n",
1613 cksum_missed, libcfs_nid2str(peer->nid));
1614 } else {
1615 rc = 0;
1616 }
1617out:
1618 if (rc >= 0)
1619 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620 aa->aa_oa, &body->oa);
1621
1622 return rc;
1623}
1624
1625static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1626 struct lov_stripe_md *lsm,
1627 obd_count page_count, struct brw_page **pga,
1628 struct obd_capa *ocapa)
1629{
1630 struct ptlrpc_request *req;
1631 int rc;
1632 wait_queue_head_t waitq;
1633 int generation, resends = 0;
1634 struct l_wait_info lwi;
1635
1636 init_waitqueue_head(&waitq);
1637 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1638
1639restart_bulk:
1640 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1641 page_count, pga, &req, ocapa, 0, resends);
1642 if (rc != 0)
1643 return (rc);
1644
1645 if (resends) {
1646 req->rq_generation_set = 1;
1647 req->rq_import_generation = generation;
1648 req->rq_sent = cfs_time_current_sec() + resends;
1649 }
1650
1651 rc = ptlrpc_queue_wait(req);
1652
1653 if (rc == -ETIMEDOUT && req->rq_resend) {
1654 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1655 ptlrpc_req_finished(req);
1656 goto restart_bulk;
1657 }
1658
1659 rc = osc_brw_fini_request(req, rc);
1660
1661 ptlrpc_req_finished(req);
1662
1663
1664 if (osc_recoverable_error(rc)) {
1665 resends++;
1666 if (rc != -EINPROGRESS &&
1667 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1668 CERROR("%s: too many resend retries for object: "
1669 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1670 POSTID(&oa->o_oi), rc);
1671 goto out;
1672 }
1673 if (generation !=
1674 exp->exp_obd->u.cli.cl_import->imp_generation) {
1675 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1676 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1677 POSTID(&oa->o_oi), rc);
1678 goto out;
1679 }
1680
1681 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1682 NULL);
1683 l_wait_event(waitq, 0, &lwi);
1684
1685 goto restart_bulk;
1686 }
1687out:
1688 if (rc == -EAGAIN || rc == -EINPROGRESS)
1689 rc = -EIO;
1690 return rc;
1691}
1692
1693static int osc_brw_redo_request(struct ptlrpc_request *request,
1694 struct osc_brw_async_args *aa, int rc)
1695{
1696 struct ptlrpc_request *new_req;
1697 struct osc_brw_async_args *new_aa;
1698 struct osc_async_page *oap;
1699
1700 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1701 "redo for recoverable error %d", rc);
1702
1703 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1704 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1705 aa->aa_cli, aa->aa_oa,
1706 NULL ,
1707 aa->aa_page_count, aa->aa_ppga,
1708 &new_req, aa->aa_ocapa, 0, 1);
1709 if (rc)
1710 return rc;
1711
1712 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1713 if (oap->oap_request != NULL) {
1714 LASSERTF(request == oap->oap_request,
1715 "request %p != oap_request %p\n",
1716 request, oap->oap_request);
1717 if (oap->oap_interrupted) {
1718 ptlrpc_req_finished(new_req);
1719 return -EINTR;
1720 }
1721 }
1722 }
1723
1724
1725 aa->aa_resends++;
1726 new_req->rq_interpret_reply = request->rq_interpret_reply;
1727 new_req->rq_async_args = request->rq_async_args;
1728
1729
1730 if (aa->aa_resends > new_req->rq_timeout)
1731 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1732 else
1733 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1734 new_req->rq_generation_set = 1;
1735 new_req->rq_import_generation = request->rq_import_generation;
1736
1737 new_aa = ptlrpc_req_async_args(new_req);
1738
1739 INIT_LIST_HEAD(&new_aa->aa_oaps);
1740 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1741 INIT_LIST_HEAD(&new_aa->aa_exts);
1742 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1743 new_aa->aa_resends = aa->aa_resends;
1744
1745 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1746 if (oap->oap_request) {
1747 ptlrpc_req_finished(oap->oap_request);
1748 oap->oap_request = ptlrpc_request_addref(new_req);
1749 }
1750 }
1751
1752 new_aa->aa_ocapa = aa->aa_ocapa;
1753 aa->aa_ocapa = NULL;
1754
1755
1756
1757
1758
1759 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1760
1761 DEBUG_REQ(D_INFO, new_req, "new request");
1762 return 0;
1763}
1764
1765
1766
1767
1768
1769
1770
1771
1772static void sort_brw_pages(struct brw_page **array, int num)
1773{
1774 int stride, i, j;
1775 struct brw_page *tmp;
1776
1777 if (num == 1)
1778 return;
1779 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1780 ;
1781
1782 do {
1783 stride /= 3;
1784 for (i = stride ; i < num ; i++) {
1785 tmp = array[i];
1786 j = i;
1787 while (j >= stride && array[j - stride]->off > tmp->off) {
1788 array[j] = array[j - stride];
1789 j -= stride;
1790 }
1791 array[j] = tmp;
1792 }
1793 } while (stride > 1);
1794}
1795
1796static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1797{
1798 int count = 1;
1799 int offset;
1800 int i = 0;
1801
1802 LASSERT (pages > 0);
1803 offset = pg[i]->off & ~CFS_PAGE_MASK;
1804
1805 for (;;) {
1806 pages--;
1807 if (pages == 0)
1808 return count;
1809
1810 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1811 return count;
1812
1813 i++;
1814 offset = pg[i]->off & ~CFS_PAGE_MASK;
1815 if (offset != 0)
1816 return count;
1817
1818 count++;
1819 }
1820}
1821
1822static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1823{
1824 struct brw_page **ppga;
1825 int i;
1826
1827 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1828 if (ppga == NULL)
1829 return NULL;
1830
1831 for (i = 0; i < count; i++)
1832 ppga[i] = pga + i;
1833 return ppga;
1834}
1835
1836static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1837{
1838 LASSERT(ppga != NULL);
1839 OBD_FREE(ppga, sizeof(*ppga) * count);
1840}
1841
1842static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1843 obd_count page_count, struct brw_page *pga,
1844 struct obd_trans_info *oti)
1845{
1846 struct obdo *saved_oa = NULL;
1847 struct brw_page **ppga, **orig;
1848 struct obd_import *imp = class_exp2cliimp(exp);
1849 struct client_obd *cli;
1850 int rc, page_count_orig;
1851
1852 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1853 cli = &imp->imp_obd->u.cli;
1854
1855 if (cmd & OBD_BRW_CHECK) {
1856
1857
1858
1859 if (imp->imp_invalid)
1860 return -EIO;
1861 return 0;
1862 }
1863
1864
1865 LASSERT(cli->cl_max_pages_per_rpc);
1866
1867 rc = 0;
1868
1869 orig = ppga = osc_build_ppga(pga, page_count);
1870 if (ppga == NULL)
1871 return -ENOMEM;
1872 page_count_orig = page_count;
1873
1874 sort_brw_pages(ppga, page_count);
1875 while (page_count) {
1876 obd_count pages_per_brw;
1877
1878 if (page_count > cli->cl_max_pages_per_rpc)
1879 pages_per_brw = cli->cl_max_pages_per_rpc;
1880 else
1881 pages_per_brw = page_count;
1882
1883 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1884
1885 if (saved_oa != NULL) {
1886
1887 *oinfo->oi_oa = *saved_oa;
1888 } else if (page_count > pages_per_brw) {
1889
1890 OBDO_ALLOC(saved_oa);
1891 if (saved_oa == NULL)
1892 GOTO(out, rc = -ENOMEM);
1893 *saved_oa = *oinfo->oi_oa;
1894 }
1895
1896 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1897 pages_per_brw, ppga, oinfo->oi_capa);
1898
1899 if (rc != 0)
1900 break;
1901
1902 page_count -= pages_per_brw;
1903 ppga += pages_per_brw;
1904 }
1905
1906out:
1907 osc_release_ppga(orig, page_count_orig);
1908
1909 if (saved_oa != NULL)
1910 OBDO_FREE(saved_oa);
1911
1912 return rc;
1913}
1914
1915static int brw_interpret(const struct lu_env *env,
1916 struct ptlrpc_request *req, void *data, int rc)
1917{
1918 struct osc_brw_async_args *aa = data;
1919 struct osc_extent *ext;
1920 struct osc_extent *tmp;
1921 struct cl_object *obj = NULL;
1922 struct client_obd *cli = aa->aa_cli;
1923
1924 rc = osc_brw_fini_request(req, rc);
1925 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1926
1927
1928 if (osc_recoverable_error(rc)) {
1929 if (req->rq_import_generation !=
1930 req->rq_import->imp_generation) {
1931 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1932 ""DOSTID", rc = %d.\n",
1933 req->rq_import->imp_obd->obd_name,
1934 POSTID(&aa->aa_oa->o_oi), rc);
1935 } else if (rc == -EINPROGRESS ||
1936 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1937 rc = osc_brw_redo_request(req, aa, rc);
1938 } else {
1939 CERROR("%s: too many resent retries for object: "
1940 ""LPU64":"LPU64", rc = %d.\n",
1941 req->rq_import->imp_obd->obd_name,
1942 POSTID(&aa->aa_oa->o_oi), rc);
1943 }
1944
1945 if (rc == 0)
1946 return 0;
1947 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1948 rc = -EIO;
1949 }
1950
1951 if (aa->aa_ocapa) {
1952 capa_put(aa->aa_ocapa);
1953 aa->aa_ocapa = NULL;
1954 }
1955
1956 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1957 if (obj == NULL && rc == 0) {
1958 obj = osc2cl(ext->oe_obj);
1959 cl_object_get(obj);
1960 }
1961
1962 list_del_init(&ext->oe_link);
1963 osc_extent_finish(env, ext, 1, rc);
1964 }
1965 LASSERT(list_empty(&aa->aa_exts));
1966 LASSERT(list_empty(&aa->aa_oaps));
1967
1968 if (obj != NULL) {
1969 struct obdo *oa = aa->aa_oa;
1970 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1971 unsigned long valid = 0;
1972
1973 LASSERT(rc == 0);
1974 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1975 attr->cat_blocks = oa->o_blocks;
1976 valid |= CAT_BLOCKS;
1977 }
1978 if (oa->o_valid & OBD_MD_FLMTIME) {
1979 attr->cat_mtime = oa->o_mtime;
1980 valid |= CAT_MTIME;
1981 }
1982 if (oa->o_valid & OBD_MD_FLATIME) {
1983 attr->cat_atime = oa->o_atime;
1984 valid |= CAT_ATIME;
1985 }
1986 if (oa->o_valid & OBD_MD_FLCTIME) {
1987 attr->cat_ctime = oa->o_ctime;
1988 valid |= CAT_CTIME;
1989 }
1990 if (valid != 0) {
1991 cl_object_attr_lock(obj);
1992 cl_object_attr_set(env, obj, attr, valid);
1993 cl_object_attr_unlock(obj);
1994 }
1995 cl_object_put(env, obj);
1996 }
1997 OBDO_FREE(aa->aa_oa);
1998
1999 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2000 req->rq_bulk->bd_nob_transferred);
2001 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2002 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2003
2004 client_obd_list_lock(&cli->cl_loi_list_lock);
2005
2006
2007
2008 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2009 cli->cl_w_in_flight--;
2010 else
2011 cli->cl_r_in_flight--;
2012 osc_wake_cache_waiters(cli);
2013 client_obd_list_unlock(&cli->cl_loi_list_lock);
2014
2015 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2016 return rc;
2017}
2018
2019
2020
2021
2022
2023
2024int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2025 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2026{
2027 struct ptlrpc_request *req = NULL;
2028 struct osc_extent *ext;
2029 struct brw_page **pga = NULL;
2030 struct osc_brw_async_args *aa = NULL;
2031 struct obdo *oa = NULL;
2032 struct osc_async_page *oap;
2033 struct osc_async_page *tmp;
2034 struct cl_req *clerq = NULL;
2035 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2036 CRT_READ;
2037 struct ldlm_lock *lock = NULL;
2038 struct cl_req_attr *crattr = NULL;
2039 obd_off starting_offset = OBD_OBJECT_EOF;
2040 obd_off ending_offset = 0;
2041 int mpflag = 0;
2042 int mem_tight = 0;
2043 int page_count = 0;
2044 int i;
2045 int rc;
2046 LIST_HEAD(rpc_list);
2047
2048 LASSERT(!list_empty(ext_list));
2049
2050
2051 list_for_each_entry(ext, ext_list, oe_link) {
2052 LASSERT(ext->oe_state == OES_RPC);
2053 mem_tight |= ext->oe_memalloc;
2054 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2055 ++page_count;
2056 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2057 if (starting_offset > oap->oap_obj_off)
2058 starting_offset = oap->oap_obj_off;
2059 else
2060 LASSERT(oap->oap_page_off == 0);
2061 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2062 ending_offset = oap->oap_obj_off +
2063 oap->oap_count;
2064 else
2065 LASSERT(oap->oap_page_off + oap->oap_count ==
2066 PAGE_CACHE_SIZE);
2067 }
2068 }
2069
2070 if (mem_tight)
2071 mpflag = cfs_memory_pressure_get_and_set();
2072
2073 OBD_ALLOC(crattr, sizeof(*crattr));
2074 if (crattr == NULL)
2075 GOTO(out, rc = -ENOMEM);
2076
2077 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2078 if (pga == NULL)
2079 GOTO(out, rc = -ENOMEM);
2080
2081 OBDO_ALLOC(oa);
2082 if (oa == NULL)
2083 GOTO(out, rc = -ENOMEM);
2084
2085 i = 0;
2086 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2087 struct cl_page *page = oap2cl_page(oap);
2088 if (clerq == NULL) {
2089 clerq = cl_req_alloc(env, page, crt,
2090 1 );
2091 if (IS_ERR(clerq))
2092 GOTO(out, rc = PTR_ERR(clerq));
2093 lock = oap->oap_ldlm_lock;
2094 }
2095 if (mem_tight)
2096 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2097 pga[i] = &oap->oap_brw_page;
2098 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2099 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2100 pga[i]->pg, page_index(oap->oap_page), oap,
2101 pga[i]->flag);
2102 i++;
2103 cl_req_page_add(env, clerq, page);
2104 }
2105
2106
2107 LASSERT(clerq != NULL);
2108 crattr->cra_oa = oa;
2109 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2110 if (lock) {
2111 oa->o_handle = lock->l_remote_handle;
2112 oa->o_valid |= OBD_MD_FLHANDLE;
2113 }
2114
2115 rc = cl_req_prep(env, clerq);
2116 if (rc != 0) {
2117 CERROR("cl_req_prep failed: %d\n", rc);
2118 GOTO(out, rc);
2119 }
2120
2121 sort_brw_pages(pga, page_count);
2122 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2123 pga, &req, crattr->cra_capa, 1, 0);
2124 if (rc != 0) {
2125 CERROR("prep_req failed: %d\n", rc);
2126 GOTO(out, rc);
2127 }
2128
2129 req->rq_interpret_reply = brw_interpret;
2130
2131 if (mem_tight != 0)
2132 req->rq_memalloc = 1;
2133
2134
2135
2136
2137
2138
2139 cl_req_attr_set(env, clerq, crattr,
2140 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2141
2142 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2143
2144 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2145 aa = ptlrpc_req_async_args(req);
2146 INIT_LIST_HEAD(&aa->aa_oaps);
2147 list_splice_init(&rpc_list, &aa->aa_oaps);
2148 INIT_LIST_HEAD(&aa->aa_exts);
2149 list_splice_init(ext_list, &aa->aa_exts);
2150 aa->aa_clerq = clerq;
2151
2152
2153
2154 tmp = NULL;
2155 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2156
2157 if (tmp == NULL)
2158 tmp = oap;
2159 if (oap->oap_interrupted && !req->rq_intr) {
2160 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2161 oap, req);
2162 ptlrpc_mark_interrupted(req);
2163 }
2164 }
2165 if (tmp != NULL)
2166 tmp->oap_request = ptlrpc_request_addref(req);
2167
2168 client_obd_list_lock(&cli->cl_loi_list_lock);
2169 starting_offset >>= PAGE_CACHE_SHIFT;
2170 if (cmd == OBD_BRW_READ) {
2171 cli->cl_r_in_flight++;
2172 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2173 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2174 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2175 starting_offset + 1);
2176 } else {
2177 cli->cl_w_in_flight++;
2178 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2179 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2180 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2181 starting_offset + 1);
2182 }
2183 client_obd_list_unlock(&cli->cl_loi_list_lock);
2184
2185 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2186 page_count, aa, cli->cl_r_in_flight,
2187 cli->cl_w_in_flight);
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201 ptlrpcd_add_req(req, pol, -1);
2202 rc = 0;
2203
2204out:
2205 if (mem_tight != 0)
2206 cfs_memory_pressure_restore(mpflag);
2207
2208 if (crattr != NULL) {
2209 capa_put(crattr->cra_capa);
2210 OBD_FREE(crattr, sizeof(*crattr));
2211 }
2212
2213 if (rc != 0) {
2214 LASSERT(req == NULL);
2215
2216 if (oa)
2217 OBDO_FREE(oa);
2218 if (pga)
2219 OBD_FREE(pga, sizeof(*pga) * page_count);
2220
2221
2222 while (!list_empty(ext_list)) {
2223 ext = list_entry(ext_list->next, struct osc_extent,
2224 oe_link);
2225 list_del_init(&ext->oe_link);
2226 osc_extent_finish(env, ext, 0, rc);
2227 }
2228 if (clerq && !IS_ERR(clerq))
2229 cl_req_completion(env, clerq, rc);
2230 }
2231 return rc;
2232}
2233
2234static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2235 struct ldlm_enqueue_info *einfo)
2236{
2237 void *data = einfo->ei_cbdata;
2238 int set = 0;
2239
2240 LASSERT(lock != NULL);
2241 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2242 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2243 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2244 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2245
2246 lock_res_and_lock(lock);
2247 spin_lock(&osc_ast_guard);
2248
2249 if (lock->l_ast_data == NULL)
2250 lock->l_ast_data = data;
2251 if (lock->l_ast_data == data)
2252 set = 1;
2253
2254 spin_unlock(&osc_ast_guard);
2255 unlock_res_and_lock(lock);
2256
2257 return set;
2258}
2259
2260static int osc_set_data_with_check(struct lustre_handle *lockh,
2261 struct ldlm_enqueue_info *einfo)
2262{
2263 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2264 int set = 0;
2265
2266 if (lock != NULL) {
2267 set = osc_set_lock_data_with_check(lock, einfo);
2268 LDLM_LOCK_PUT(lock);
2269 } else
2270 CERROR("lockh %p, data %p - client evicted?\n",
2271 lockh, einfo->ei_cbdata);
2272 return set;
2273}
2274
2275static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2276 ldlm_iterator_t replace, void *data)
2277{
2278 struct ldlm_res_id res_id;
2279 struct obd_device *obd = class_exp2obd(exp);
2280
2281 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2282 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2283 return 0;
2284}
2285
2286
2287
2288
2289
2290static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2291 ldlm_iterator_t replace, void *data)
2292{
2293 struct ldlm_res_id res_id;
2294 struct obd_device *obd = class_exp2obd(exp);
2295 int rc = 0;
2296
2297 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2298 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2299 if (rc == LDLM_ITER_STOP)
2300 return(1);
2301 if (rc == LDLM_ITER_CONTINUE)
2302 return(0);
2303 return(rc);
2304}
2305
2306static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2307 obd_enqueue_update_f upcall, void *cookie,
2308 __u64 *flags, int agl, int rc)
2309{
2310 int intent = *flags & LDLM_FL_HAS_INTENT;
2311
2312 if (intent) {
2313
2314 if (rc == ELDLM_LOCK_ABORTED) {
2315 struct ldlm_reply *rep;
2316 rep = req_capsule_server_get(&req->rq_pill,
2317 &RMF_DLM_REP);
2318
2319 LASSERT(rep != NULL);
2320 rep->lock_policy_res1 =
2321 ptlrpc_status_ntoh(rep->lock_policy_res1);
2322 if (rep->lock_policy_res1)
2323 rc = rep->lock_policy_res1;
2324 }
2325 }
2326
2327 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2328 (rc == 0)) {
2329 *flags |= LDLM_FL_LVB_READY;
2330 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2331 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2332 }
2333
2334
2335 rc = (*upcall)(cookie, rc);
2336 return rc;
2337}
2338
2339static int osc_enqueue_interpret(const struct lu_env *env,
2340 struct ptlrpc_request *req,
2341 struct osc_enqueue_args *aa, int rc)
2342{
2343 struct ldlm_lock *lock;
2344 struct lustre_handle handle;
2345 __u32 mode;
2346 struct ost_lvb *lvb;
2347 __u32 lvb_len;
2348 __u64 *flags = aa->oa_flags;
2349
2350
2351
2352 lustre_handle_copy(&handle, aa->oa_lockh);
2353 mode = aa->oa_ei->ei_mode;
2354
2355
2356
2357 lock = ldlm_handle2lock(&handle);
2358
2359
2360
2361
2362
2363 ldlm_lock_addref(&handle, mode);
2364
2365
2366 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2367
2368 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2369 lvb = NULL;
2370 lvb_len = 0;
2371 } else {
2372 lvb = aa->oa_lvb;
2373 lvb_len = sizeof(*aa->oa_lvb);
2374 }
2375
2376
2377 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2378 mode, flags, lvb, lvb_len, &handle, rc);
2379
2380 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2381 flags, aa->oa_agl, rc);
2382
2383 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2384
2385
2386 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2387
2388
2389
2390
2391
2392 ldlm_lock_decref(&handle, mode);
2393
2394 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2395 aa->oa_lockh, req, aa);
2396 ldlm_lock_decref(&handle, mode);
2397 LDLM_LOCK_PUT(lock);
2398 return rc;
2399}
2400
2401void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2402 struct lov_oinfo *loi, int flags,
2403 struct ost_lvb *lvb, __u32 mode, int rc)
2404{
2405 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2406
2407 if (rc == ELDLM_OK) {
2408 __u64 tmp;
2409
2410 LASSERT(lock != NULL);
2411 loi->loi_lvb = *lvb;
2412 tmp = loi->loi_lvb.lvb_size;
2413
2414
2415 if (tmp > lock->l_policy_data.l_extent.end)
2416 tmp = lock->l_policy_data.l_extent.end + 1;
2417 if (tmp >= loi->loi_kms) {
2418 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2419 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2420 loi_kms_set(loi, tmp);
2421 } else {
2422 LDLM_DEBUG(lock, "lock acquired, setting rss="
2423 LPU64"; leaving kms="LPU64", end="LPU64,
2424 loi->loi_lvb.lvb_size, loi->loi_kms,
2425 lock->l_policy_data.l_extent.end);
2426 }
2427 ldlm_lock_allow_match(lock);
2428 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2429 LASSERT(lock != NULL);
2430 loi->loi_lvb = *lvb;
2431 ldlm_lock_allow_match(lock);
2432 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2433 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2434 rc = ELDLM_OK;
2435 }
2436
2437 if (lock != NULL) {
2438 if (rc != ELDLM_OK)
2439 ldlm_lock_fail_match(lock);
2440
2441 LDLM_LOCK_PUT(lock);
2442 }
2443}
2444EXPORT_SYMBOL(osc_update_enqueue);
2445
2446struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2447
2448
2449
2450
2451
2452
2453
2454
2455int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2456 __u64 *flags, ldlm_policy_data_t *policy,
2457 struct ost_lvb *lvb, int kms_valid,
2458 obd_enqueue_update_f upcall, void *cookie,
2459 struct ldlm_enqueue_info *einfo,
2460 struct lustre_handle *lockh,
2461 struct ptlrpc_request_set *rqset, int async, int agl)
2462{
2463 struct obd_device *obd = exp->exp_obd;
2464 struct ptlrpc_request *req = NULL;
2465 int intent = *flags & LDLM_FL_HAS_INTENT;
2466 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2467 ldlm_mode_t mode;
2468 int rc;
2469
2470
2471
2472 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2473 policy->l_extent.end |= ~CFS_PAGE_MASK;
2474
2475
2476
2477
2478
2479
2480
2481 if (!kms_valid)
2482 goto no_match;
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496 mode = einfo->ei_mode;
2497 if (einfo->ei_mode == LCK_PR)
2498 mode |= LCK_PW;
2499 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2500 einfo->ei_type, policy, mode, lockh, 0);
2501 if (mode) {
2502 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2503
2504 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2505
2506
2507
2508 ldlm_lock_decref(lockh, mode);
2509 LDLM_LOCK_PUT(matched);
2510 return -ECANCELED;
2511 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2512 *flags |= LDLM_FL_LVB_READY;
2513
2514
2515 if (!rqset && einfo->ei_mode != mode)
2516 ldlm_lock_addref(lockh, LCK_PR);
2517 if (intent) {
2518
2519
2520
2521 }
2522
2523
2524
2525
2526
2527 (*upcall)(cookie, ELDLM_OK);
2528
2529 if (einfo->ei_mode != mode)
2530 ldlm_lock_decref(lockh, LCK_PW);
2531 else if (rqset)
2532
2533 ldlm_lock_decref(lockh, einfo->ei_mode);
2534 LDLM_LOCK_PUT(matched);
2535 return ELDLM_OK;
2536 } else {
2537 ldlm_lock_decref(lockh, mode);
2538 LDLM_LOCK_PUT(matched);
2539 }
2540 }
2541
2542 no_match:
2543 if (intent) {
2544 LIST_HEAD(cancels);
2545 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2546 &RQF_LDLM_ENQUEUE_LVB);
2547 if (req == NULL)
2548 return -ENOMEM;
2549
2550 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2551 if (rc) {
2552 ptlrpc_request_free(req);
2553 return rc;
2554 }
2555
2556 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2557 sizeof(*lvb));
2558 ptlrpc_request_set_replen(req);
2559 }
2560
2561
2562 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2563
2564 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2565 sizeof(*lvb), LVB_T_OST, lockh, async);
2566 if (rqset) {
2567 if (!rc) {
2568 struct osc_enqueue_args *aa;
2569 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2570 aa = ptlrpc_req_async_args(req);
2571 aa->oa_ei = einfo;
2572 aa->oa_exp = exp;
2573 aa->oa_flags = flags;
2574 aa->oa_upcall = upcall;
2575 aa->oa_cookie = cookie;
2576 aa->oa_lvb = lvb;
2577 aa->oa_lockh = lockh;
2578 aa->oa_agl = !!agl;
2579
2580 req->rq_interpret_reply =
2581 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2582 if (rqset == PTLRPCD_SET)
2583 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2584 else
2585 ptlrpc_set_add_req(rqset, req);
2586 } else if (intent) {
2587 ptlrpc_req_finished(req);
2588 }
2589 return rc;
2590 }
2591
2592 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2593 if (intent)
2594 ptlrpc_req_finished(req);
2595
2596 return rc;
2597}
2598
2599static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2600 struct ldlm_enqueue_info *einfo,
2601 struct ptlrpc_request_set *rqset)
2602{
2603 struct ldlm_res_id res_id;
2604 int rc;
2605
2606 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2607 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2608 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2609 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2610 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2611 rqset, rqset != NULL, 0);
2612 return rc;
2613}
2614
2615int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2616 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2617 int *flags, void *data, struct lustre_handle *lockh,
2618 int unref)
2619{
2620 struct obd_device *obd = exp->exp_obd;
2621 int lflags = *flags;
2622 ldlm_mode_t rc;
2623
2624 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2625 return -EIO;
2626
2627
2628
2629 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2630 policy->l_extent.end |= ~CFS_PAGE_MASK;
2631
2632
2633
2634
2635
2636 rc = mode;
2637 if (mode == LCK_PR)
2638 rc |= LCK_PW;
2639 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2640 res_id, type, policy, rc, lockh, unref);
2641 if (rc) {
2642 if (data != NULL) {
2643 if (!osc_set_data_with_check(lockh, data)) {
2644 if (!(lflags & LDLM_FL_TEST_LOCK))
2645 ldlm_lock_decref(lockh, rc);
2646 return 0;
2647 }
2648 }
2649 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2650 ldlm_lock_addref(lockh, LCK_PR);
2651 ldlm_lock_decref(lockh, LCK_PW);
2652 }
2653 return rc;
2654 }
2655 return rc;
2656}
2657
2658int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2659{
2660 if (unlikely(mode == LCK_GROUP))
2661 ldlm_lock_decref_and_cancel(lockh, mode);
2662 else
2663 ldlm_lock_decref(lockh, mode);
2664
2665 return 0;
2666}
2667
2668static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2669 __u32 mode, struct lustre_handle *lockh)
2670{
2671 return osc_cancel_base(lockh, mode);
2672}
2673
2674static int osc_cancel_unused(struct obd_export *exp,
2675 struct lov_stripe_md *lsm,
2676 ldlm_cancel_flags_t flags,
2677 void *opaque)
2678{
2679 struct obd_device *obd = class_exp2obd(exp);
2680 struct ldlm_res_id res_id, *resp = NULL;
2681
2682 if (lsm != NULL) {
2683 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2684 resp = &res_id;
2685 }
2686
2687 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2688}
2689
2690static int osc_statfs_interpret(const struct lu_env *env,
2691 struct ptlrpc_request *req,
2692 struct osc_async_args *aa, int rc)
2693{
2694 struct obd_statfs *msfs;
2695
2696 if (rc == -EBADR)
2697
2698
2699
2700
2701
2702 return rc;
2703
2704 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2705 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2706 GOTO(out, rc = 0);
2707
2708 if (rc != 0)
2709 GOTO(out, rc);
2710
2711 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712 if (msfs == NULL) {
2713 GOTO(out, rc = -EPROTO);
2714 }
2715
2716 *aa->aa_oi->oi_osfs = *msfs;
2717out:
2718 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719 return rc;
2720}
2721
2722static int osc_statfs_async(struct obd_export *exp,
2723 struct obd_info *oinfo, __u64 max_age,
2724 struct ptlrpc_request_set *rqset)
2725{
2726 struct obd_device *obd = class_exp2obd(exp);
2727 struct ptlrpc_request *req;
2728 struct osc_async_args *aa;
2729 int rc;
2730
2731
2732
2733
2734
2735
2736
2737 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2738 if (req == NULL)
2739 return -ENOMEM;
2740
2741 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2742 if (rc) {
2743 ptlrpc_request_free(req);
2744 return rc;
2745 }
2746 ptlrpc_request_set_replen(req);
2747 req->rq_request_portal = OST_CREATE_PORTAL;
2748 ptlrpc_at_set_req_timeout(req);
2749
2750 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2751
2752 req->rq_no_resend = 1;
2753 req->rq_no_delay = 1;
2754 }
2755
2756 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2757 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2758 aa = ptlrpc_req_async_args(req);
2759 aa->aa_oi = oinfo;
2760
2761 ptlrpc_set_add_req(rqset, req);
2762 return 0;
2763}
2764
2765static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2766 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2767{
2768 struct obd_device *obd = class_exp2obd(exp);
2769 struct obd_statfs *msfs;
2770 struct ptlrpc_request *req;
2771 struct obd_import *imp = NULL;
2772 int rc;
2773
2774
2775
2776 down_read(&obd->u.cli.cl_sem);
2777 if (obd->u.cli.cl_import)
2778 imp = class_import_get(obd->u.cli.cl_import);
2779 up_read(&obd->u.cli.cl_sem);
2780 if (!imp)
2781 return -ENODEV;
2782
2783
2784
2785
2786
2787
2788
2789 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2790
2791 class_import_put(imp);
2792
2793 if (req == NULL)
2794 return -ENOMEM;
2795
2796 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2797 if (rc) {
2798 ptlrpc_request_free(req);
2799 return rc;
2800 }
2801 ptlrpc_request_set_replen(req);
2802 req->rq_request_portal = OST_CREATE_PORTAL;
2803 ptlrpc_at_set_req_timeout(req);
2804
2805 if (flags & OBD_STATFS_NODELAY) {
2806
2807 req->rq_no_resend = 1;
2808 req->rq_no_delay = 1;
2809 }
2810
2811 rc = ptlrpc_queue_wait(req);
2812 if (rc)
2813 GOTO(out, rc);
2814
2815 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2816 if (msfs == NULL) {
2817 GOTO(out, rc = -EPROTO);
2818 }
2819
2820 *osfs = *msfs;
2821
2822 out:
2823 ptlrpc_req_finished(req);
2824 return rc;
2825}
2826
2827
2828
2829
2830
2831
2832
2833static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2834{
2835
2836 struct lov_user_md_v3 lum, *lumk;
2837 struct lov_user_ost_data_v1 *lmm_objects;
2838 int rc = 0, lum_size;
2839
2840 if (!lsm)
2841 return -ENODATA;
2842
2843
2844
2845 lum_size = sizeof(struct lov_user_md_v1);
2846 if (copy_from_user(&lum, lump, lum_size))
2847 return -EFAULT;
2848
2849 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2850 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2851 return -EINVAL;
2852
2853
2854 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2855 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2856 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2857
2858
2859
2860 if (lum.lmm_stripe_count > 0) {
2861 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2862 OBD_ALLOC(lumk, lum_size);
2863 if (!lumk)
2864 return -ENOMEM;
2865
2866 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2867 lmm_objects =
2868 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2869 else
2870 lmm_objects = &(lumk->lmm_objects[0]);
2871 lmm_objects->l_ost_oi = lsm->lsm_oi;
2872 } else {
2873 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2874 lumk = &lum;
2875 }
2876
2877 lumk->lmm_oi = lsm->lsm_oi;
2878 lumk->lmm_stripe_count = 1;
2879
2880 if (copy_to_user(lump, lumk, lum_size))
2881 rc = -EFAULT;
2882
2883 if (lumk != &lum)
2884 OBD_FREE(lumk, lum_size);
2885
2886 return rc;
2887}
2888
2889
2890static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2891 void *karg, void *uarg)
2892{
2893 struct obd_device *obd = exp->exp_obd;
2894 struct obd_ioctl_data *data = karg;
2895 int err = 0;
2896
2897 if (!try_module_get(THIS_MODULE)) {
2898 CERROR("Can't get module. Is it alive?");
2899 return -EINVAL;
2900 }
2901 switch (cmd) {
2902 case OBD_IOC_LOV_GET_CONFIG: {
2903 char *buf;
2904 struct lov_desc *desc;
2905 struct obd_uuid uuid;
2906
2907 buf = NULL;
2908 len = 0;
2909 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2910 GOTO(out, err = -EINVAL);
2911
2912 data = (struct obd_ioctl_data *)buf;
2913
2914 if (sizeof(*desc) > data->ioc_inllen1) {
2915 obd_ioctl_freedata(buf, len);
2916 GOTO(out, err = -EINVAL);
2917 }
2918
2919 if (data->ioc_inllen2 < sizeof(uuid)) {
2920 obd_ioctl_freedata(buf, len);
2921 GOTO(out, err = -EINVAL);
2922 }
2923
2924 desc = (struct lov_desc *)data->ioc_inlbuf1;
2925 desc->ld_tgt_count = 1;
2926 desc->ld_active_tgt_count = 1;
2927 desc->ld_default_stripe_count = 1;
2928 desc->ld_default_stripe_size = 0;
2929 desc->ld_default_stripe_offset = 0;
2930 desc->ld_pattern = 0;
2931 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2932
2933 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2934
2935 err = copy_to_user((void *)uarg, buf, len);
2936 if (err)
2937 err = -EFAULT;
2938 obd_ioctl_freedata(buf, len);
2939 GOTO(out, err);
2940 }
2941 case LL_IOC_LOV_SETSTRIPE:
2942 err = obd_alloc_memmd(exp, karg);
2943 if (err > 0)
2944 err = 0;
2945 GOTO(out, err);
2946 case LL_IOC_LOV_GETSTRIPE:
2947 err = osc_getstripe(karg, uarg);
2948 GOTO(out, err);
2949 case OBD_IOC_CLIENT_RECOVER:
2950 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2951 data->ioc_inlbuf1, 0);
2952 if (err > 0)
2953 err = 0;
2954 GOTO(out, err);
2955 case IOC_OSC_SET_ACTIVE:
2956 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2957 data->ioc_offset);
2958 GOTO(out, err);
2959 case OBD_IOC_POLL_QUOTACHECK:
2960 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2961 GOTO(out, err);
2962 case OBD_IOC_PING_TARGET:
2963 err = ptlrpc_obd_ping(obd);
2964 GOTO(out, err);
2965 default:
2966 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2967 cmd, current_comm());
2968 GOTO(out, err = -ENOTTY);
2969 }
2970out:
2971 module_put(THIS_MODULE);
2972 return err;
2973}
2974
2975static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2976 obd_count keylen, void *key, __u32 *vallen, void *val,
2977 struct lov_stripe_md *lsm)
2978{
2979 if (!vallen || !val)
2980 return -EFAULT;
2981
2982 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2983 __u32 *stripe = val;
2984 *vallen = sizeof(*stripe);
2985 *stripe = 0;
2986 return 0;
2987 } else if (KEY_IS(KEY_LAST_ID)) {
2988 struct ptlrpc_request *req;
2989 obd_id *reply;
2990 char *tmp;
2991 int rc;
2992
2993 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2994 &RQF_OST_GET_INFO_LAST_ID);
2995 if (req == NULL)
2996 return -ENOMEM;
2997
2998 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2999 RCL_CLIENT, keylen);
3000 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3001 if (rc) {
3002 ptlrpc_request_free(req);
3003 return rc;
3004 }
3005
3006 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3007 memcpy(tmp, key, keylen);
3008
3009 req->rq_no_delay = req->rq_no_resend = 1;
3010 ptlrpc_request_set_replen(req);
3011 rc = ptlrpc_queue_wait(req);
3012 if (rc)
3013 GOTO(out, rc);
3014
3015 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3016 if (reply == NULL)
3017 GOTO(out, rc = -EPROTO);
3018
3019 *((obd_id *)val) = *reply;
3020 out:
3021 ptlrpc_req_finished(req);
3022 return rc;
3023 } else if (KEY_IS(KEY_FIEMAP)) {
3024 struct ll_fiemap_info_key *fm_key =
3025 (struct ll_fiemap_info_key *)key;
3026 struct ldlm_res_id res_id;
3027 ldlm_policy_data_t policy;
3028 struct lustre_handle lockh;
3029 ldlm_mode_t mode = 0;
3030 struct ptlrpc_request *req;
3031 struct ll_user_fiemap *reply;
3032 char *tmp;
3033 int rc;
3034
3035 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3036 goto skip_locking;
3037
3038 policy.l_extent.start = fm_key->fiemap.fm_start &
3039 CFS_PAGE_MASK;
3040
3041 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3042 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3043 policy.l_extent.end = OBD_OBJECT_EOF;
3044 else
3045 policy.l_extent.end = (fm_key->fiemap.fm_start +
3046 fm_key->fiemap.fm_length +
3047 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3048
3049 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3050 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3051 LDLM_FL_BLOCK_GRANTED |
3052 LDLM_FL_LVB_READY,
3053 &res_id, LDLM_EXTENT, &policy,
3054 LCK_PR | LCK_PW, &lockh, 0);
3055 if (mode) {
3056 if (mode != LCK_PR) {
3057 ldlm_lock_addref(&lockh, LCK_PR);
3058 ldlm_lock_decref(&lockh, LCK_PW);
3059 }
3060 } else {
3061 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3062 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3063 }
3064
3065skip_locking:
3066 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3067 &RQF_OST_GET_INFO_FIEMAP);
3068 if (req == NULL)
3069 GOTO(drop_lock, rc = -ENOMEM);
3070
3071 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3072 RCL_CLIENT, keylen);
3073 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3074 RCL_CLIENT, *vallen);
3075 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3076 RCL_SERVER, *vallen);
3077
3078 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3079 if (rc) {
3080 ptlrpc_request_free(req);
3081 GOTO(drop_lock, rc);
3082 }
3083
3084 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3085 memcpy(tmp, key, keylen);
3086 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087 memcpy(tmp, val, *vallen);
3088
3089 ptlrpc_request_set_replen(req);
3090 rc = ptlrpc_queue_wait(req);
3091 if (rc)
3092 GOTO(fini_req, rc);
3093
3094 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3095 if (reply == NULL)
3096 GOTO(fini_req, rc = -EPROTO);
3097
3098 memcpy(val, reply, *vallen);
3099fini_req:
3100 ptlrpc_req_finished(req);
3101drop_lock:
3102 if (mode)
3103 ldlm_lock_decref(&lockh, LCK_PR);
3104 return rc;
3105 }
3106
3107 return -EINVAL;
3108}
3109
3110static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3111 obd_count keylen, void *key, obd_count vallen,
3112 void *val, struct ptlrpc_request_set *set)
3113{
3114 struct ptlrpc_request *req;
3115 struct obd_device *obd = exp->exp_obd;
3116 struct obd_import *imp = class_exp2cliimp(exp);
3117 char *tmp;
3118 int rc;
3119
3120 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3121
3122 if (KEY_IS(KEY_CHECKSUM)) {
3123 if (vallen != sizeof(int))
3124 return -EINVAL;
3125 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3126 return 0;
3127 }
3128
3129 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3130 sptlrpc_conf_client_adapt(obd);
3131 return 0;
3132 }
3133
3134 if (KEY_IS(KEY_FLUSH_CTX)) {
3135 sptlrpc_import_flush_my_ctx(imp);
3136 return 0;
3137 }
3138
3139 if (KEY_IS(KEY_CACHE_SET)) {
3140 struct client_obd *cli = &obd->u.cli;
3141
3142 LASSERT(cli->cl_cache == NULL);
3143 cli->cl_cache = (struct cl_client_cache *)val;
3144 atomic_inc(&cli->cl_cache->ccc_users);
3145 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3146
3147
3148 LASSERT(list_empty(&cli->cl_lru_osc));
3149 spin_lock(&cli->cl_cache->ccc_lru_lock);
3150 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3151 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3152
3153 return 0;
3154 }
3155
3156 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3157 struct client_obd *cli = &obd->u.cli;
3158 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3159 int target = *(int *)val;
3160
3161 nr = osc_lru_shrink(cli, min(nr, target));
3162 *(int *)val -= nr;
3163 return 0;
3164 }
3165
3166 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167 return -EINVAL;
3168
3169
3170
3171
3172
3173
3174
3175
3176 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3177 &RQF_OST_SET_GRANT_INFO :
3178 &RQF_OBD_SET_INFO);
3179 if (req == NULL)
3180 return -ENOMEM;
3181
3182 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3183 RCL_CLIENT, keylen);
3184 if (!KEY_IS(KEY_GRANT_SHRINK))
3185 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3186 RCL_CLIENT, vallen);
3187 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3188 if (rc) {
3189 ptlrpc_request_free(req);
3190 return rc;
3191 }
3192
3193 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3194 memcpy(tmp, key, keylen);
3195 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196 &RMF_OST_BODY :
3197 &RMF_SETINFO_VAL);
3198 memcpy(tmp, val, vallen);
3199
3200 if (KEY_IS(KEY_GRANT_SHRINK)) {
3201 struct osc_grant_args *aa;
3202 struct obdo *oa;
3203
3204 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3205 aa = ptlrpc_req_async_args(req);
3206 OBDO_ALLOC(oa);
3207 if (!oa) {
3208 ptlrpc_req_finished(req);
3209 return -ENOMEM;
3210 }
3211 *oa = ((struct ost_body *)val)->oa;
3212 aa->aa_oa = oa;
3213 req->rq_interpret_reply = osc_shrink_grant_interpret;
3214 }
3215
3216 ptlrpc_request_set_replen(req);
3217 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3218 LASSERT(set != NULL);
3219 ptlrpc_set_add_req(set, req);
3220 ptlrpc_check_set(NULL, set);
3221 } else
3222 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3223
3224 return 0;
3225}
3226
3227
3228static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3229 struct obd_device *disk_obd, int *index)
3230{
3231
3232
3233 LBUG();
3234 return 0;
3235}
3236
3237static int osc_llog_finish(struct obd_device *obd, int count)
3238{
3239 struct llog_ctxt *ctxt;
3240
3241 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3242 if (ctxt) {
3243 llog_cat_close(NULL, ctxt->loc_handle);
3244 llog_cleanup(NULL, ctxt);
3245 }
3246
3247 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3248 if (ctxt)
3249 llog_cleanup(NULL, ctxt);
3250 return 0;
3251}
3252
3253static int osc_reconnect(const struct lu_env *env,
3254 struct obd_export *exp, struct obd_device *obd,
3255 struct obd_uuid *cluuid,
3256 struct obd_connect_data *data,
3257 void *localdata)
3258{
3259 struct client_obd *cli = &obd->u.cli;
3260
3261 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3262 long lost_grant;
3263
3264 client_obd_list_lock(&cli->cl_loi_list_lock);
3265 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3266 2 * cli_brw_size(obd);
3267 lost_grant = cli->cl_lost_grant;
3268 cli->cl_lost_grant = 0;
3269 client_obd_list_unlock(&cli->cl_loi_list_lock);
3270
3271 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3272 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3273 data->ocd_version, data->ocd_grant, lost_grant);
3274 }
3275
3276 return 0;
3277}
3278
3279static int osc_disconnect(struct obd_export *exp)
3280{
3281 struct obd_device *obd = class_exp2obd(exp);
3282 struct llog_ctxt *ctxt;
3283 int rc;
3284
3285 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3286 if (ctxt) {
3287 if (obd->u.cli.cl_conn_count == 1) {
3288
3289
3290 llog_sync(ctxt, exp, 0);
3291 }
3292 llog_ctxt_put(ctxt);
3293 } else {
3294 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3295 obd);
3296 }
3297
3298 rc = client_disconnect_export(exp);
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316 if (obd->u.cli.cl_import == NULL)
3317 osc_del_shrink_grant(&obd->u.cli);
3318 return rc;
3319}
3320
3321static int osc_import_event(struct obd_device *obd,
3322 struct obd_import *imp,
3323 enum obd_import_event event)
3324{
3325 struct client_obd *cli;
3326 int rc = 0;
3327
3328 LASSERT(imp->imp_obd == obd);
3329
3330 switch (event) {
3331 case IMP_EVENT_DISCON: {
3332 cli = &obd->u.cli;
3333 client_obd_list_lock(&cli->cl_loi_list_lock);
3334 cli->cl_avail_grant = 0;
3335 cli->cl_lost_grant = 0;
3336 client_obd_list_unlock(&cli->cl_loi_list_lock);
3337 break;
3338 }
3339 case IMP_EVENT_INACTIVE: {
3340 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3341 break;
3342 }
3343 case IMP_EVENT_INVALIDATE: {
3344 struct ldlm_namespace *ns = obd->obd_namespace;
3345 struct lu_env *env;
3346 int refcheck;
3347
3348 env = cl_env_get(&refcheck);
3349 if (!IS_ERR(env)) {
3350
3351 cli = &obd->u.cli;
3352
3353
3354 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3355
3356 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3357 cl_env_put(env, &refcheck);
3358 } else
3359 rc = PTR_ERR(env);
3360 break;
3361 }
3362 case IMP_EVENT_ACTIVE: {
3363 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3364 break;
3365 }
3366 case IMP_EVENT_OCD: {
3367 struct obd_connect_data *ocd = &imp->imp_connect_data;
3368
3369 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3370 osc_init_grant(&obd->u.cli, ocd);
3371
3372
3373 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3374 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3375
3376 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3377 break;
3378 }
3379 case IMP_EVENT_DEACTIVATE: {
3380 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3381 break;
3382 }
3383 case IMP_EVENT_ACTIVATE: {
3384 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3385 break;
3386 }
3387 default:
3388 CERROR("Unknown import event %d\n", event);
3389 LBUG();
3390 }
3391 return rc;
3392}
3393
3394
3395
3396
3397
3398
3399
3400
3401static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3402{
3403 check_res_locked(lock->l_resource);
3404
3405
3406
3407
3408
3409
3410
3411 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3412 (lock->l_granted_mode == LCK_PR ||
3413 lock->l_granted_mode == LCK_CR) &&
3414 (osc_dlm_lock_pageref(lock) == 0))
3415 return 1;
3416
3417 return 0;
3418}
3419
3420static int brw_queue_work(const struct lu_env *env, void *data)
3421{
3422 struct client_obd *cli = data;
3423
3424 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3425
3426 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3427 return 0;
3428}
3429
3430int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3431{
3432 struct lprocfs_static_vars lvars = { 0 };
3433 struct client_obd *cli = &obd->u.cli;
3434 void *handler;
3435 int rc;
3436
3437 rc = ptlrpcd_addref();
3438 if (rc)
3439 return rc;
3440
3441 rc = client_obd_setup(obd, lcfg);
3442 if (rc)
3443 GOTO(out_ptlrpcd, rc);
3444
3445 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3446 if (IS_ERR(handler))
3447 GOTO(out_client_setup, rc = PTR_ERR(handler));
3448 cli->cl_writeback_work = handler;
3449
3450 rc = osc_quota_setup(obd);
3451 if (rc)
3452 GOTO(out_ptlrpcd_work, rc);
3453
3454 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3455 lprocfs_osc_init_vars(&lvars);
3456 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3457 lproc_osc_attach_seqstat(obd);
3458 sptlrpc_lprocfs_cliobd_attach(obd);
3459 ptlrpc_lprocfs_register_obd(obd);
3460 }
3461
3462
3463
3464
3465
3466
3467 cli->cl_import->imp_rq_pool =
3468 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3469 OST_MAXREQSIZE,
3470 ptlrpc_add_rqs_to_pool);
3471
3472 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3473 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3474 return rc;
3475
3476out_ptlrpcd_work:
3477 ptlrpcd_destroy_work(handler);
3478out_client_setup:
3479 client_obd_cleanup(obd);
3480out_ptlrpcd:
3481 ptlrpcd_decref();
3482 return rc;
3483}
3484
3485static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3486{
3487 int rc = 0;
3488
3489 switch (stage) {
3490 case OBD_CLEANUP_EARLY: {
3491 struct obd_import *imp;
3492 imp = obd->u.cli.cl_import;
3493 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3494
3495 ptlrpc_deactivate_import(imp);
3496 spin_lock(&imp->imp_lock);
3497 imp->imp_pingable = 0;
3498 spin_unlock(&imp->imp_lock);
3499 break;
3500 }
3501 case OBD_CLEANUP_EXPORTS: {
3502 struct client_obd *cli = &obd->u.cli;
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512 obd_zombie_barrier();
3513 if (cli->cl_writeback_work) {
3514 ptlrpcd_destroy_work(cli->cl_writeback_work);
3515 cli->cl_writeback_work = NULL;
3516 }
3517 obd_cleanup_client_import(obd);
3518 ptlrpc_lprocfs_unregister_obd(obd);
3519 lprocfs_obd_cleanup(obd);
3520 rc = obd_llog_finish(obd, 0);
3521 if (rc != 0)
3522 CERROR("failed to cleanup llogging subsystems\n");
3523 break;
3524 }
3525 }
3526 return rc;
3527}
3528
3529int osc_cleanup(struct obd_device *obd)
3530{
3531 struct client_obd *cli = &obd->u.cli;
3532 int rc;
3533
3534
3535 if (cli->cl_cache != NULL) {
3536 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3537 spin_lock(&cli->cl_cache->ccc_lru_lock);
3538 list_del_init(&cli->cl_lru_osc);
3539 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3540 cli->cl_lru_left = NULL;
3541 atomic_dec(&cli->cl_cache->ccc_users);
3542 cli->cl_cache = NULL;
3543 }
3544
3545
3546 osc_quota_cleanup(obd);
3547
3548 rc = client_obd_cleanup(obd);
3549
3550 ptlrpcd_decref();
3551 return rc;
3552}
3553
3554int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3555{
3556 struct lprocfs_static_vars lvars = { 0 };
3557 int rc = 0;
3558
3559 lprocfs_osc_init_vars(&lvars);
3560
3561 switch (lcfg->lcfg_command) {
3562 default:
3563 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3564 lcfg, obd);
3565 if (rc > 0)
3566 rc = 0;
3567 break;
3568 }
3569
3570 return(rc);
3571}
3572
3573static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3574{
3575 return osc_process_config_base(obd, buf);
3576}
3577
3578struct obd_ops osc_obd_ops = {
3579 .o_owner = THIS_MODULE,
3580 .o_setup = osc_setup,
3581 .o_precleanup = osc_precleanup,
3582 .o_cleanup = osc_cleanup,
3583 .o_add_conn = client_import_add_conn,
3584 .o_del_conn = client_import_del_conn,
3585 .o_connect = client_connect_import,
3586 .o_reconnect = osc_reconnect,
3587 .o_disconnect = osc_disconnect,
3588 .o_statfs = osc_statfs,
3589 .o_statfs_async = osc_statfs_async,
3590 .o_packmd = osc_packmd,
3591 .o_unpackmd = osc_unpackmd,
3592 .o_create = osc_create,
3593 .o_destroy = osc_destroy,
3594 .o_getattr = osc_getattr,
3595 .o_getattr_async = osc_getattr_async,
3596 .o_setattr = osc_setattr,
3597 .o_setattr_async = osc_setattr_async,
3598 .o_brw = osc_brw,
3599 .o_punch = osc_punch,
3600 .o_sync = osc_sync,
3601 .o_enqueue = osc_enqueue,
3602 .o_change_cbdata = osc_change_cbdata,
3603 .o_find_cbdata = osc_find_cbdata,
3604 .o_cancel = osc_cancel,
3605 .o_cancel_unused = osc_cancel_unused,
3606 .o_iocontrol = osc_iocontrol,
3607 .o_get_info = osc_get_info,
3608 .o_set_info_async = osc_set_info_async,
3609 .o_import_event = osc_import_event,
3610 .o_llog_init = osc_llog_init,
3611 .o_llog_finish = osc_llog_finish,
3612 .o_process_config = osc_process_config,
3613 .o_quotactl = osc_quotactl,
3614 .o_quotacheck = osc_quotacheck,
3615};
3616
3617extern struct lu_kmem_descr osc_caches[];
3618extern spinlock_t osc_ast_guard;
3619extern struct lock_class_key osc_ast_guard_class;
3620
3621int __init osc_init(void)
3622{
3623 struct lprocfs_static_vars lvars = { 0 };
3624 int rc;
3625
3626
3627
3628
3629 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3630
3631 rc = lu_kmem_init(osc_caches);
3632 if (rc)
3633 return rc;
3634
3635 lprocfs_osc_init_vars(&lvars);
3636
3637 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3638 LUSTRE_OSC_NAME, &osc_device_type);
3639 if (rc) {
3640 lu_kmem_fini(osc_caches);
3641 return rc;
3642 }
3643
3644 spin_lock_init(&osc_ast_guard);
3645 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3646
3647 return rc;
3648}
3649
3650static void osc_exit(void)
3651{
3652 class_unregister_type(LUSTRE_OSC_NAME);
3653 lu_kmem_fini(osc_caches);
3654}
3655
3656MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3657MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3658MODULE_LICENSE("GPL");
3659MODULE_VERSION(LUSTRE_VERSION_STRING);
3660
3661module_init(osc_init);
3662module_exit(osc_exit);
3663