1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include "../../include/linux/libcfs/libcfs.h"
40
41
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
46#include "../include/obd_ost.h"
47
48#include "../include/lustre_ha.h"
49#include "../include/lprocfs_status.h"
50#include "../include/lustre_log.h"
51#include "../include/lustre_debug.h"
52#include "../include/lustre_param.h"
53#include "../include/lustre_fid.h"
54#include "osc_internal.h"
55#include "osc_cl_internal.h"
56
57static void osc_release_ppga(struct brw_page **ppga, obd_count count);
58static int brw_interpret(const struct lu_env *env,
59 struct ptlrpc_request *req, void *data, int rc);
60int osc_cleanup(struct obd_device *obd);
61
62
63static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
64 struct lov_stripe_md *lsm)
65{
66 int lmm_size;
67
68 lmm_size = sizeof(**lmmp);
69 if (lmmp == NULL)
70 return lmm_size;
71
72 if (*lmmp != NULL && lsm == NULL) {
73 OBD_FREE(*lmmp, lmm_size);
74 *lmmp = NULL;
75 return 0;
76 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
77 return -EBADF;
78 }
79
80 if (*lmmp == NULL) {
81 OBD_ALLOC(*lmmp, lmm_size);
82 if (*lmmp == NULL)
83 return -ENOMEM;
84 }
85
86 if (lsm)
87 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
88
89 return lmm_size;
90}
91
92
93static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
94 struct lov_mds_md *lmm, int lmm_bytes)
95{
96 int lsm_size;
97 struct obd_import *imp = class_exp2cliimp(exp);
98
99 if (lmm != NULL) {
100 if (lmm_bytes < sizeof(*lmm)) {
101 CERROR("%s: lov_mds_md too small: %d, need %d\n",
102 exp->exp_obd->obd_name, lmm_bytes,
103 (int)sizeof(*lmm));
104 return -EINVAL;
105 }
106
107
108 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
109 CERROR("%s: zero lmm_object_id: rc = %d\n",
110 exp->exp_obd->obd_name, -EINVAL);
111 return -EINVAL;
112 }
113 }
114
115 lsm_size = lov_stripe_md_size(1);
116 if (lsmp == NULL)
117 return lsm_size;
118
119 if (*lsmp != NULL && lmm == NULL) {
120 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
121 OBD_FREE(*lsmp, lsm_size);
122 *lsmp = NULL;
123 return 0;
124 }
125
126 if (*lsmp == NULL) {
127 OBD_ALLOC(*lsmp, lsm_size);
128 if (unlikely(*lsmp == NULL))
129 return -ENOMEM;
130 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
132 OBD_FREE(*lsmp, lsm_size);
133 return -ENOMEM;
134 }
135 loi_init((*lsmp)->lsm_oinfo[0]);
136 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
137 return -EBADF;
138 }
139
140 if (lmm != NULL)
141
142 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
143
144 if (imp != NULL &&
145 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
146 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
147 else
148 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
149
150 return lsm_size;
151}
152
153static inline void osc_pack_capa(struct ptlrpc_request *req,
154 struct ost_body *body, void *capa)
155{
156 struct obd_capa *oc = (struct obd_capa *)capa;
157 struct lustre_capa *c;
158
159 if (!capa)
160 return;
161
162 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
163 LASSERT(c);
164 capa_cpy(c, oc);
165 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
166 DEBUG_CAPA(D_SEC, c, "pack");
167}
168
169static inline void osc_pack_req_body(struct ptlrpc_request *req,
170 struct obd_info *oinfo)
171{
172 struct ost_body *body;
173
174 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
175 LASSERT(body);
176
177 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
178 oinfo->oi_oa);
179 osc_pack_capa(req, body, oinfo->oi_capa);
180}
181
182static inline void osc_set_capa_size(struct ptlrpc_request *req,
183 const struct req_msg_field *field,
184 struct obd_capa *oc)
185{
186 if (oc == NULL)
187 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
188 else
189
190 ;
191}
192
193static int osc_getattr_interpret(const struct lu_env *env,
194 struct ptlrpc_request *req,
195 struct osc_async_args *aa, int rc)
196{
197 struct ost_body *body;
198
199 if (rc != 0)
200 GOTO(out, rc);
201
202 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203 if (body) {
204 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
205 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
206 aa->aa_oi->oi_oa, &body->oa);
207
208
209 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
210 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
211 } else {
212 CDEBUG(D_INFO, "can't unpack ost_body\n");
213 rc = -EPROTO;
214 aa->aa_oi->oi_oa->o_valid = 0;
215 }
216out:
217 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
218 return rc;
219}
220
221static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
222 struct ptlrpc_request_set *set)
223{
224 struct ptlrpc_request *req;
225 struct osc_async_args *aa;
226 int rc;
227
228 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
229 if (req == NULL)
230 return -ENOMEM;
231
232 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
233 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
234 if (rc) {
235 ptlrpc_request_free(req);
236 return rc;
237 }
238
239 osc_pack_req_body(req, oinfo);
240
241 ptlrpc_request_set_replen(req);
242 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
243
244 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
245 aa = ptlrpc_req_async_args(req);
246 aa->aa_oi = oinfo;
247
248 ptlrpc_set_add_req(set, req);
249 return 0;
250}
251
252static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
253 struct obd_info *oinfo)
254{
255 struct ptlrpc_request *req;
256 struct ost_body *body;
257 int rc;
258
259 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
260 if (req == NULL)
261 return -ENOMEM;
262
263 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
264 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
265 if (rc) {
266 ptlrpc_request_free(req);
267 return rc;
268 }
269
270 osc_pack_req_body(req, oinfo);
271
272 ptlrpc_request_set_replen(req);
273
274 rc = ptlrpc_queue_wait(req);
275 if (rc)
276 GOTO(out, rc);
277
278 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
279 if (body == NULL)
280 GOTO(out, rc = -EPROTO);
281
282 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
283 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
284 &body->oa);
285
286 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
287 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
288
289 out:
290 ptlrpc_req_finished(req);
291 return rc;
292}
293
294static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
295 struct obd_info *oinfo, struct obd_trans_info *oti)
296{
297 struct ptlrpc_request *req;
298 struct ost_body *body;
299 int rc;
300
301 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
302
303 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
304 if (req == NULL)
305 return -ENOMEM;
306
307 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
308 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
309 if (rc) {
310 ptlrpc_request_free(req);
311 return rc;
312 }
313
314 osc_pack_req_body(req, oinfo);
315
316 ptlrpc_request_set_replen(req);
317
318 rc = ptlrpc_queue_wait(req);
319 if (rc)
320 GOTO(out, rc);
321
322 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
323 if (body == NULL)
324 GOTO(out, rc = -EPROTO);
325
326 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327 &body->oa);
328
329out:
330 ptlrpc_req_finished(req);
331 return rc;
332}
333
334static int osc_setattr_interpret(const struct lu_env *env,
335 struct ptlrpc_request *req,
336 struct osc_setattr_args *sa, int rc)
337{
338 struct ost_body *body;
339
340 if (rc != 0)
341 GOTO(out, rc);
342
343 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344 if (body == NULL)
345 GOTO(out, rc = -EPROTO);
346
347 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
348 &body->oa);
349out:
350 rc = sa->sa_upcall(sa->sa_cookie, rc);
351 return rc;
352}
353
354int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
355 struct obd_trans_info *oti,
356 obd_enqueue_update_f upcall, void *cookie,
357 struct ptlrpc_request_set *rqset)
358{
359 struct ptlrpc_request *req;
360 struct osc_setattr_args *sa;
361 int rc;
362
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
364 if (req == NULL)
365 return -ENOMEM;
366
367 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
368 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
369 if (rc) {
370 ptlrpc_request_free(req);
371 return rc;
372 }
373
374 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
375 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
376
377 osc_pack_req_body(req, oinfo);
378
379 ptlrpc_request_set_replen(req);
380
381
382 if (!rqset) {
383
384 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
385 } else {
386 req->rq_interpret_reply =
387 (ptlrpc_interpterer_t)osc_setattr_interpret;
388
389 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
390 sa = ptlrpc_req_async_args(req);
391 sa->sa_oa = oinfo->oi_oa;
392 sa->sa_upcall = upcall;
393 sa->sa_cookie = cookie;
394
395 if (rqset == PTLRPCD_SET)
396 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
397 else
398 ptlrpc_set_add_req(rqset, req);
399 }
400
401 return 0;
402}
403
404static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
405 struct obd_trans_info *oti,
406 struct ptlrpc_request_set *rqset)
407{
408 return osc_setattr_async_base(exp, oinfo, oti,
409 oinfo->oi_cb_up, oinfo, rqset);
410}
411
412int osc_real_create(struct obd_export *exp, struct obdo *oa,
413 struct lov_stripe_md **ea, struct obd_trans_info *oti)
414{
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct lov_stripe_md *lsm;
418 int rc;
419
420 LASSERT(oa);
421 LASSERT(ea);
422
423 lsm = *ea;
424 if (!lsm) {
425 rc = obd_alloc_memmd(exp, &lsm);
426 if (rc < 0)
427 return rc;
428 }
429
430 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
431 if (req == NULL)
432 GOTO(out, rc = -ENOMEM);
433
434 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
435 if (rc) {
436 ptlrpc_request_free(req);
437 GOTO(out, rc);
438 }
439
440 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 LASSERT(body);
442
443 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
444
445 ptlrpc_request_set_replen(req);
446
447 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448 oa->o_flags == OBD_FL_DELORPHAN) {
449 DEBUG_REQ(D_HA, req,
450 "delorphan from OST integration");
451
452 req->rq_no_resend = req->rq_no_delay = 1;
453 }
454
455 rc = ptlrpc_queue_wait(req);
456 if (rc)
457 GOTO(out_req, rc);
458
459 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460 if (body == NULL)
461 GOTO(out_req, rc = -EPROTO);
462
463 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
464 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
465
466 oa->o_blksize = cli_brw_size(exp->exp_obd);
467 oa->o_valid |= OBD_MD_FLBLKSZ;
468
469
470
471
472
473 lsm->lsm_oi = oa->o_oi;
474 *ea = lsm;
475
476 if (oti != NULL) {
477 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478
479 if (oa->o_valid & OBD_MD_FLCOOKIE) {
480 if (!oti->oti_logcookies)
481 oti_alloc_cookies(oti, 1);
482 *oti->oti_logcookies = oa->o_lcookie;
483 }
484 }
485
486 CDEBUG(D_HA, "transno: %lld\n",
487 lustre_msg_get_transno(req->rq_repmsg));
488out_req:
489 ptlrpc_req_finished(req);
490out:
491 if (rc && !*ea)
492 obd_free_memmd(exp, &lsm);
493 return rc;
494}
495
496int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
497 obd_enqueue_update_f upcall, void *cookie,
498 struct ptlrpc_request_set *rqset)
499{
500 struct ptlrpc_request *req;
501 struct osc_setattr_args *sa;
502 struct ost_body *body;
503 int rc;
504
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
506 if (req == NULL)
507 return -ENOMEM;
508
509 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
510 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
511 if (rc) {
512 ptlrpc_request_free(req);
513 return rc;
514 }
515 req->rq_request_portal = OST_IO_PORTAL;
516 ptlrpc_at_set_req_timeout(req);
517
518 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
519 LASSERT(body);
520 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
521 oinfo->oi_oa);
522 osc_pack_capa(req, body, oinfo->oi_capa);
523
524 ptlrpc_request_set_replen(req);
525
526 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
527 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
528 sa = ptlrpc_req_async_args(req);
529 sa->sa_oa = oinfo->oi_oa;
530 sa->sa_upcall = upcall;
531 sa->sa_cookie = cookie;
532 if (rqset == PTLRPCD_SET)
533 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
534 else
535 ptlrpc_set_add_req(rqset, req);
536
537 return 0;
538}
539
540static int osc_punch(const struct lu_env *env, struct obd_export *exp,
541 struct obd_info *oinfo, struct obd_trans_info *oti,
542 struct ptlrpc_request_set *rqset)
543{
544 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
545 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
546 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
547 return osc_punch_base(exp, oinfo,
548 oinfo->oi_cb_up, oinfo, rqset);
549}
550
551static int osc_sync_interpret(const struct lu_env *env,
552 struct ptlrpc_request *req,
553 void *arg, int rc)
554{
555 struct osc_fsync_args *fa = arg;
556 struct ost_body *body;
557
558 if (rc)
559 GOTO(out, rc);
560
561 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
562 if (body == NULL) {
563 CERROR ("can't unpack ost_body\n");
564 GOTO(out, rc = -EPROTO);
565 }
566
567 *fa->fa_oi->oi_oa = body->oa;
568out:
569 rc = fa->fa_upcall(fa->fa_cookie, rc);
570 return rc;
571}
572
573int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
574 obd_enqueue_update_f upcall, void *cookie,
575 struct ptlrpc_request_set *rqset)
576{
577 struct ptlrpc_request *req;
578 struct ost_body *body;
579 struct osc_fsync_args *fa;
580 int rc;
581
582 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
583 if (req == NULL)
584 return -ENOMEM;
585
586 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
587 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588 if (rc) {
589 ptlrpc_request_free(req);
590 return rc;
591 }
592
593
594 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595 LASSERT(body);
596 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
597 oinfo->oi_oa);
598 osc_pack_capa(req, body, oinfo->oi_capa);
599
600 ptlrpc_request_set_replen(req);
601 req->rq_interpret_reply = osc_sync_interpret;
602
603 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
604 fa = ptlrpc_req_async_args(req);
605 fa->fa_oi = oinfo;
606 fa->fa_upcall = upcall;
607 fa->fa_cookie = cookie;
608
609 if (rqset == PTLRPCD_SET)
610 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
611 else
612 ptlrpc_set_add_req(rqset, req);
613
614 return 0;
615}
616
617static int osc_sync(const struct lu_env *env, struct obd_export *exp,
618 struct obd_info *oinfo, obd_size start, obd_size end,
619 struct ptlrpc_request_set *set)
620{
621 if (!oinfo->oi_oa) {
622 CDEBUG(D_INFO, "oa NULL\n");
623 return -EINVAL;
624 }
625
626 oinfo->oi_oa->o_size = start;
627 oinfo->oi_oa->o_blocks = end;
628 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
629
630 return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
631}
632
633
634
635
636static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
637 struct list_head *cancels,
638 ldlm_mode_t mode, __u64 lock_flags)
639{
640 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
641 struct ldlm_res_id res_id;
642 struct ldlm_resource *res;
643 int count;
644
645
646
647
648
649
650
651 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
652 return 0;
653
654 ostid_build_res_name(&oa->o_oi, &res_id);
655 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656 if (res == NULL)
657 return 0;
658
659 LDLM_RESOURCE_ADDREF(res);
660 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661 lock_flags, 0, NULL);
662 LDLM_RESOURCE_DELREF(res);
663 ldlm_resource_putref(res);
664 return count;
665}
666
667static int osc_destroy_interpret(const struct lu_env *env,
668 struct ptlrpc_request *req, void *data,
669 int rc)
670{
671 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672
673 atomic_dec(&cli->cl_destroy_in_flight);
674 wake_up(&cli->cl_destroy_waitq);
675 return 0;
676}
677
678static int osc_can_send_destroy(struct client_obd *cli)
679{
680 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
681 cli->cl_max_rpcs_in_flight) {
682
683 return 1;
684 }
685 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
686 cli->cl_max_rpcs_in_flight) {
687
688
689
690
691 wake_up(&cli->cl_destroy_waitq);
692 }
693 return 0;
694}
695
696int osc_create(const struct lu_env *env, struct obd_export *exp,
697 struct obdo *oa, struct lov_stripe_md **ea,
698 struct obd_trans_info *oti)
699{
700 int rc = 0;
701
702 LASSERT(oa);
703 LASSERT(ea);
704 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
705
706 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707 oa->o_flags == OBD_FL_RECREATE_OBJS) {
708 return osc_real_create(exp, oa, ea, oti);
709 }
710
711 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712 return osc_real_create(exp, oa, ea, oti);
713
714
715 LBUG();
716
717 return rc;
718}
719
720
721
722
723
724
725
726
727
728
729
730static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731 struct obdo *oa, struct lov_stripe_md *ea,
732 struct obd_trans_info *oti, struct obd_export *md_export,
733 void *capa)
734{
735 struct client_obd *cli = &exp->exp_obd->u.cli;
736 struct ptlrpc_request *req;
737 struct ost_body *body;
738 LIST_HEAD(cancels);
739 int rc, count;
740
741 if (!oa) {
742 CDEBUG(D_INFO, "oa NULL\n");
743 return -EINVAL;
744 }
745
746 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
747 LDLM_FL_DISCARD_DATA);
748
749 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
750 if (req == NULL) {
751 ldlm_lock_list_put(&cancels, l_bl_ast, count);
752 return -ENOMEM;
753 }
754
755 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
756 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
757 0, &cancels, count);
758 if (rc) {
759 ptlrpc_request_free(req);
760 return rc;
761 }
762
763 req->rq_request_portal = OST_IO_PORTAL;
764 ptlrpc_at_set_req_timeout(req);
765
766 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
767 oa->o_lcookie = *oti->oti_logcookies;
768 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
769 LASSERT(body);
770 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
771
772 osc_pack_capa(req, body, (struct obd_capa *)capa);
773 ptlrpc_request_set_replen(req);
774
775
776
777
778
779 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
780 req->rq_interpret_reply = osc_destroy_interpret;
781 if (!osc_can_send_destroy(cli)) {
782 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
783 NULL);
784
785
786
787
788
789 l_wait_event_exclusive(cli->cl_destroy_waitq,
790 osc_can_send_destroy(cli), &lwi);
791 }
792 }
793
794
795 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
796 return 0;
797}
798
799static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
800 long writing_bytes)
801{
802 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
803
804 LASSERT(!(oa->o_valid & bits));
805
806 oa->o_valid |= bits;
807 client_obd_list_lock(&cli->cl_loi_list_lock);
808 oa->o_dirty = cli->cl_dirty;
809 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
810 cli->cl_dirty_max)) {
811 CERROR("dirty %lu - %lu > dirty_max %lu\n",
812 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
813 oa->o_undirty = 0;
814 } else if (unlikely(atomic_read(&obd_dirty_pages) -
815 atomic_read(&obd_dirty_transit_pages) >
816 (long)(obd_max_dirty_pages + 1))) {
817
818
819
820 CERROR("dirty %d - %d > system dirty_max %d\n",
821 atomic_read(&obd_dirty_pages),
822 atomic_read(&obd_dirty_transit_pages),
823 obd_max_dirty_pages);
824 oa->o_undirty = 0;
825 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
826 CERROR("dirty %lu - dirty_max %lu too big???\n",
827 cli->cl_dirty, cli->cl_dirty_max);
828 oa->o_undirty = 0;
829 } else {
830 long max_in_flight = (cli->cl_max_pages_per_rpc <<
831 PAGE_CACHE_SHIFT)*
832 (cli->cl_max_rpcs_in_flight + 1);
833 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
834 }
835 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
836 oa->o_dropped = cli->cl_lost_grant;
837 cli->cl_lost_grant = 0;
838 client_obd_list_unlock(&cli->cl_loi_list_lock);
839 CDEBUG(D_CACHE,"dirty: %llu undirty: %u dropped %u grant: %llu\n",
840 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
841
842}
843
844void osc_update_next_shrink(struct client_obd *cli)
845{
846 cli->cl_next_shrink_grant =
847 cfs_time_shift(cli->cl_grant_shrink_interval);
848 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
849 cli->cl_next_shrink_grant);
850}
851
852static void __osc_update_grant(struct client_obd *cli, obd_size grant)
853{
854 client_obd_list_lock(&cli->cl_loi_list_lock);
855 cli->cl_avail_grant += grant;
856 client_obd_list_unlock(&cli->cl_loi_list_lock);
857}
858
859static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
860{
861 if (body->oa.o_valid & OBD_MD_FLGRANT) {
862 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
863 __osc_update_grant(cli, body->oa.o_grant);
864 }
865}
866
867static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
868 obd_count keylen, void *key, obd_count vallen,
869 void *val, struct ptlrpc_request_set *set);
870
871static int osc_shrink_grant_interpret(const struct lu_env *env,
872 struct ptlrpc_request *req,
873 void *aa, int rc)
874{
875 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
876 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
877 struct ost_body *body;
878
879 if (rc != 0) {
880 __osc_update_grant(cli, oa->o_grant);
881 GOTO(out, rc);
882 }
883
884 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
885 LASSERT(body);
886 osc_update_grant(cli, body);
887out:
888 OBDO_FREE(oa);
889 return rc;
890}
891
892static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
893{
894 client_obd_list_lock(&cli->cl_loi_list_lock);
895 oa->o_grant = cli->cl_avail_grant / 4;
896 cli->cl_avail_grant -= oa->o_grant;
897 client_obd_list_unlock(&cli->cl_loi_list_lock);
898 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
899 oa->o_valid |= OBD_MD_FLFLAGS;
900 oa->o_flags = 0;
901 }
902 oa->o_flags |= OBD_FL_SHRINK_GRANT;
903 osc_update_next_shrink(cli);
904}
905
906
907
908
909
910static int osc_shrink_grant(struct client_obd *cli)
911{
912 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
913 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
914
915 client_obd_list_lock(&cli->cl_loi_list_lock);
916 if (cli->cl_avail_grant <= target_bytes)
917 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
918 client_obd_list_unlock(&cli->cl_loi_list_lock);
919
920 return osc_shrink_grant_to_target(cli, target_bytes);
921}
922
923int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
924{
925 int rc = 0;
926 struct ost_body *body;
927
928 client_obd_list_lock(&cli->cl_loi_list_lock);
929
930
931
932 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
933 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
934
935 if (target_bytes >= cli->cl_avail_grant) {
936 client_obd_list_unlock(&cli->cl_loi_list_lock);
937 return 0;
938 }
939 client_obd_list_unlock(&cli->cl_loi_list_lock);
940
941 OBD_ALLOC_PTR(body);
942 if (!body)
943 return -ENOMEM;
944
945 osc_announce_cached(cli, &body->oa, 0);
946
947 client_obd_list_lock(&cli->cl_loi_list_lock);
948 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
949 cli->cl_avail_grant = target_bytes;
950 client_obd_list_unlock(&cli->cl_loi_list_lock);
951 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
952 body->oa.o_valid |= OBD_MD_FLFLAGS;
953 body->oa.o_flags = 0;
954 }
955 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
956 osc_update_next_shrink(cli);
957
958 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
959 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
960 sizeof(*body), body, NULL);
961 if (rc != 0)
962 __osc_update_grant(cli, body->oa.o_grant);
963 OBD_FREE_PTR(body);
964 return rc;
965}
966
967static int osc_should_shrink_grant(struct client_obd *client)
968{
969 unsigned long time = cfs_time_current();
970 unsigned long next_shrink = client->cl_next_shrink_grant;
971
972 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973 OBD_CONNECT_GRANT_SHRINK) == 0)
974 return 0;
975
976 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977
978
979
980 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
981
982 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
983 client->cl_avail_grant > brw_size)
984 return 1;
985 else
986 osc_update_next_shrink(client);
987 }
988 return 0;
989}
990
991static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
992{
993 struct client_obd *client;
994
995 list_for_each_entry(client, &item->ti_obd_list,
996 cl_grant_shrink_list) {
997 if (osc_should_shrink_grant(client))
998 osc_shrink_grant(client);
999 }
1000 return 0;
1001}
1002
1003static int osc_add_shrink_grant(struct client_obd *client)
1004{
1005 int rc;
1006
1007 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1008 TIMEOUT_GRANT,
1009 osc_grant_shrink_grant_cb, NULL,
1010 &client->cl_grant_shrink_list);
1011 if (rc) {
1012 CERROR("add grant client %s error %d\n",
1013 client->cl_import->imp_obd->obd_name, rc);
1014 return rc;
1015 }
1016 CDEBUG(D_CACHE, "add grant client %s \n",
1017 client->cl_import->imp_obd->obd_name);
1018 osc_update_next_shrink(client);
1019 return 0;
1020}
1021
1022static int osc_del_shrink_grant(struct client_obd *client)
1023{
1024 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1025 TIMEOUT_GRANT);
1026}
1027
1028static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029{
1030
1031
1032
1033
1034
1035
1036
1037
1038 client_obd_list_lock(&cli->cl_loi_list_lock);
1039 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1040 cli->cl_avail_grant = ocd->ocd_grant;
1041 else
1042 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1043
1044 if (cli->cl_avail_grant < 0) {
1045 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047 ocd->ocd_grant, cli->cl_dirty);
1048
1049
1050 cli->cl_avail_grant = ocd->ocd_grant;
1051 }
1052
1053
1054 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055 client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062 list_empty(&cli->cl_grant_shrink_list))
1063 osc_add_shrink_grant(cli);
1064}
1065
1066
1067
1068
1069
1070static void handle_short_read(int nob_read, obd_count page_count,
1071 struct brw_page **pga)
1072{
1073 char *ptr;
1074 int i = 0;
1075
1076
1077 while (nob_read > 0) {
1078 LASSERT (page_count > 0);
1079
1080 if (pga[i]->count > nob_read) {
1081
1082 ptr = kmap(pga[i]->pg) +
1083 (pga[i]->off & ~CFS_PAGE_MASK);
1084 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085 kunmap(pga[i]->pg);
1086 page_count--;
1087 i++;
1088 break;
1089 }
1090
1091 nob_read -= pga[i]->count;
1092 page_count--;
1093 i++;
1094 }
1095
1096
1097 while (page_count-- > 0) {
1098 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099 memset(ptr, 0, pga[i]->count);
1100 kunmap(pga[i]->pg);
1101 i++;
1102 }
1103}
1104
1105static int check_write_rcs(struct ptlrpc_request *req,
1106 int requested_nob, int niocount,
1107 obd_count page_count, struct brw_page **pga)
1108{
1109 int i;
1110 __u32 *remote_rcs;
1111
1112 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113 sizeof(*remote_rcs) *
1114 niocount);
1115 if (remote_rcs == NULL) {
1116 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117 return(-EPROTO);
1118 }
1119
1120
1121 for (i = 0; i < niocount; i++) {
1122 if ((int)remote_rcs[i] < 0)
1123 return(remote_rcs[i]);
1124
1125 if (remote_rcs[i] != 0) {
1126 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127 i, remote_rcs[i], req);
1128 return(-EPROTO);
1129 }
1130 }
1131
1132 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134 req->rq_bulk->bd_nob_transferred, requested_nob);
1135 return(-EPROTO);
1136 }
1137
1138 return (0);
1139}
1140
1141static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142{
1143 if (p1->flag != p2->flag) {
1144 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1145 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1146
1147
1148
1149 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1150 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1151 "report this at http://bugs.whamcloud.com/\n",
1152 p1->flag, p2->flag);
1153 }
1154 return 0;
1155 }
1156
1157 return (p1->off + p1->count == p2->off);
1158}
1159
1160static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1161 struct brw_page **pga, int opc,
1162 cksum_type_t cksum_type)
1163{
1164 __u32 cksum;
1165 int i = 0;
1166 struct cfs_crypto_hash_desc *hdesc;
1167 unsigned int bufsize;
1168 int err;
1169 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1170
1171 LASSERT(pg_count > 0);
1172
1173 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1174 if (IS_ERR(hdesc)) {
1175 CERROR("Unable to initialize checksum hash %s\n",
1176 cfs_crypto_hash_name(cfs_alg));
1177 return PTR_ERR(hdesc);
1178 }
1179
1180 while (nob > 0 && pg_count > 0) {
1181 int count = pga[i]->count > nob ? nob : pga[i]->count;
1182
1183
1184
1185 if (i == 0 && opc == OST_READ &&
1186 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1187 unsigned char *ptr = kmap(pga[i]->pg);
1188 int off = pga[i]->off & ~CFS_PAGE_MASK;
1189 memcpy(ptr + off, "bad1", min(4, nob));
1190 kunmap(pga[i]->pg);
1191 }
1192 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1193 pga[i]->off & ~CFS_PAGE_MASK,
1194 count);
1195 CDEBUG(D_PAGE,
1196 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1197 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1198 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1199 page_private(pga[i]->pg),
1200 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1201
1202 nob -= pga[i]->count;
1203 pg_count--;
1204 i++;
1205 }
1206
1207 bufsize = 4;
1208 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1209
1210 if (err)
1211 cfs_crypto_hash_final(hdesc, NULL, NULL);
1212
1213
1214
1215 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216 cksum++;
1217
1218 return cksum;
1219}
1220
1221static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222 struct lov_stripe_md *lsm, obd_count page_count,
1223 struct brw_page **pga,
1224 struct ptlrpc_request **reqp,
1225 struct obd_capa *ocapa, int reserve,
1226 int resend)
1227{
1228 struct ptlrpc_request *req;
1229 struct ptlrpc_bulk_desc *desc;
1230 struct ost_body *body;
1231 struct obd_ioobj *ioobj;
1232 struct niobuf_remote *niobuf;
1233 int niocount, i, requested_nob, opc, rc;
1234 struct osc_brw_async_args *aa;
1235 struct req_capsule *pill;
1236 struct brw_page *pg_prev;
1237
1238 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239 return -ENOMEM;
1240 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241 return -EINVAL;
1242
1243 if ((cmd & OBD_BRW_WRITE) != 0) {
1244 opc = OST_WRITE;
1245 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246 cli->cl_import->imp_rq_pool,
1247 &RQF_OST_BRW_WRITE);
1248 } else {
1249 opc = OST_READ;
1250 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251 }
1252 if (req == NULL)
1253 return -ENOMEM;
1254
1255 for (niocount = i = 1; i < page_count; i++) {
1256 if (!can_merge_pages(pga[i - 1], pga[i]))
1257 niocount++;
1258 }
1259
1260 pill = &req->rq_pill;
1261 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262 sizeof(*ioobj));
1263 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264 niocount * sizeof(*niobuf));
1265 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268 if (rc) {
1269 ptlrpc_request_free(req);
1270 return rc;
1271 }
1272 req->rq_request_portal = OST_IO_PORTAL;
1273 ptlrpc_at_set_req_timeout(req);
1274
1275
1276 req->rq_no_retry_einprogress = 1;
1277
1278 desc = ptlrpc_prep_bulk_imp(req, page_count,
1279 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281 OST_BULK_PORTAL);
1282
1283 if (desc == NULL)
1284 GOTO(out, rc = -ENOMEM);
1285
1286
1287 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294 obdo_to_ioobj(oa, ioobj);
1295 ioobj->ioo_bufcnt = niocount;
1296
1297
1298
1299
1300
1301 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302 osc_pack_capa(req, body, ocapa);
1303 LASSERT(page_count > 0);
1304 pg_prev = pga[0];
1305 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306 struct brw_page *pg = pga[i];
1307 int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309 LASSERT(pg->count > 0);
1310
1311 LASSERTF(page_count == 1 ||
1312 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313 ergo(i > 0 && i < page_count - 1,
1314 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1315 ergo(i == page_count - 1, poff == 0)),
1316 "i: %d/%d pg: %p off: %llu, count: %u\n",
1317 i, page_count, pg, pg->off, pg->count);
1318 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1320 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1321 i, page_count,
1322 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323 pg_prev->pg, page_private(pg_prev->pg),
1324 pg_prev->pg->index, pg_prev->off);
1325 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326 (pg->flag & OBD_BRW_SRVLOCK));
1327
1328 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329 requested_nob += pg->count;
1330
1331 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332 niobuf--;
1333 niobuf->len += pg->count;
1334 } else {
1335 niobuf->offset = pg->off;
1336 niobuf->len = pg->count;
1337 niobuf->flags = pg->flag;
1338 }
1339 pg_prev = pg;
1340 }
1341
1342 LASSERTF((void *)(niobuf - niocount) ==
1343 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348 if (resend) {
1349 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350 body->oa.o_valid |= OBD_MD_FLFLAGS;
1351 body->oa.o_flags = 0;
1352 }
1353 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354 }
1355
1356 if (osc_should_shrink_grant(cli))
1357 osc_shrink_grant_local(cli, &body->oa);
1358
1359
1360 if (opc == OST_WRITE) {
1361 if (cli->cl_checksum &&
1362 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363
1364
1365 cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368 oa->o_flags &= OBD_FL_LOCAL_MASK;
1369 body->oa.o_flags = 0;
1370 }
1371 body->oa.o_flags |= cksum_type_pack(cksum_type);
1372 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374 page_count, pga,
1375 OST_WRITE,
1376 cksum_type);
1377 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378 body->oa.o_cksum);
1379
1380 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381 oa->o_flags |= cksum_type_pack(cksum_type);
1382 } else {
1383
1384
1385 oa->o_valid &= ~OBD_MD_FLCKSUM;
1386 }
1387 oa->o_cksum = body->oa.o_cksum;
1388
1389 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390 sizeof(__u32) * niocount);
1391 } else {
1392 if (cli->cl_checksum &&
1393 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395 body->oa.o_flags = 0;
1396 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398 }
1399 }
1400 ptlrpc_request_set_replen(req);
1401
1402 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403 aa = ptlrpc_req_async_args(req);
1404 aa->aa_oa = oa;
1405 aa->aa_requested_nob = requested_nob;
1406 aa->aa_nio_count = niocount;
1407 aa->aa_page_count = page_count;
1408 aa->aa_resends = 0;
1409 aa->aa_ppga = pga;
1410 aa->aa_cli = cli;
1411 INIT_LIST_HEAD(&aa->aa_oaps);
1412 if (ocapa && reserve)
1413 aa->aa_ocapa = capa_get(ocapa);
1414
1415 *reqp = req;
1416 return 0;
1417
1418 out:
1419 ptlrpc_req_finished(req);
1420 return rc;
1421}
1422
1423static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424 __u32 client_cksum, __u32 server_cksum, int nob,
1425 obd_count page_count, struct brw_page **pga,
1426 cksum_type_t client_cksum_type)
1427{
1428 __u32 new_cksum;
1429 char *msg;
1430 cksum_type_t cksum_type;
1431
1432 if (server_cksum == client_cksum) {
1433 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434 return 0;
1435 }
1436
1437 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438 oa->o_flags : 0);
1439 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440 cksum_type);
1441
1442 if (cksum_type != client_cksum_type)
1443 msg = "the server did not use the checksum type specified in "
1444 "the original request - likely a protocol problem";
1445 else if (new_cksum == server_cksum)
1446 msg = "changed on the client after we checksummed it - "
1447 "likely false positive due to mmap IO (bug 11742)";
1448 else if (new_cksum == client_cksum)
1449 msg = "changed in transit before arrival at OST";
1450 else
1451 msg = "changed in transit AND doesn't match the original - "
1452 "likely false positive due to mmap IO (bug 11742)";
1453
1454 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455 " object "DOSTID" extent [%llu-%llu]\n",
1456 msg, libcfs_nid2str(peer->nid),
1457 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460 POSTID(&oa->o_oi), pga[0]->off,
1461 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463 "client csum now %x\n", client_cksum, client_cksum_type,
1464 server_cksum, cksum_type, new_cksum);
1465 return 1;
1466}
1467
1468
1469static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470{
1471 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472 const lnet_process_id_t *peer =
1473 &req->rq_import->imp_connection->c_peer;
1474 struct client_obd *cli = aa->aa_cli;
1475 struct ost_body *body;
1476 __u32 client_cksum = 0;
1477
1478 if (rc < 0 && rc != -EDQUOT) {
1479 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1480 return rc;
1481 }
1482
1483 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1485 if (body == NULL) {
1486 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1487 return -EPROTO;
1488 }
1489
1490
1491 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1496 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497 body->oa.o_flags);
1498 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1499 }
1500
1501 osc_update_grant(cli, body);
1502
1503 if (rc < 0)
1504 return rc;
1505
1506 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507 client_cksum = aa->aa_oa->o_cksum;
1508
1509 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510 if (rc > 0) {
1511 CERROR("Unexpected +ve rc %d\n", rc);
1512 return -EPROTO;
1513 }
1514 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517 return -EAGAIN;
1518
1519 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520 check_write_checksum(&body->oa, peer, client_cksum,
1521 body->oa.o_cksum, aa->aa_requested_nob,
1522 aa->aa_page_count, aa->aa_ppga,
1523 cksum_type_unpack(aa->aa_oa->o_flags)))
1524 return -EAGAIN;
1525
1526 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527 aa->aa_page_count, aa->aa_ppga);
1528 GOTO(out, rc);
1529 }
1530
1531
1532
1533
1534 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535 if (rc < 0)
1536 GOTO(out, rc = -EAGAIN);
1537
1538 if (rc > aa->aa_requested_nob) {
1539 CERROR("Unexpected rc %d (%d requested)\n", rc,
1540 aa->aa_requested_nob);
1541 return -EPROTO;
1542 }
1543
1544 if (rc != req->rq_bulk->bd_nob_transferred) {
1545 CERROR ("Unexpected rc %d (%d transferred)\n",
1546 rc, req->rq_bulk->bd_nob_transferred);
1547 return (-EPROTO);
1548 }
1549
1550 if (rc < aa->aa_requested_nob)
1551 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554 static int cksum_counter;
1555 __u32 server_cksum = body->oa.o_cksum;
1556 char *via;
1557 char *router;
1558 cksum_type_t cksum_type;
1559
1560 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1561 body->oa.o_flags : 0);
1562 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563 aa->aa_ppga, OST_READ,
1564 cksum_type);
1565
1566 if (peer->nid == req->rq_bulk->bd_sender) {
1567 via = router = "";
1568 } else {
1569 via = " via ";
1570 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1571 }
1572
1573 if (server_cksum != client_cksum) {
1574 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1575 "%s%s%s inode "DFID" object "DOSTID
1576 " extent [%llu-%llu]\n",
1577 req->rq_import->imp_obd->obd_name,
1578 libcfs_nid2str(peer->nid),
1579 via, router,
1580 body->oa.o_valid & OBD_MD_FLFID ?
1581 body->oa.o_parent_seq : (__u64)0,
1582 body->oa.o_valid & OBD_MD_FLFID ?
1583 body->oa.o_parent_oid : 0,
1584 body->oa.o_valid & OBD_MD_FLFID ?
1585 body->oa.o_parent_ver : 0,
1586 POSTID(&body->oa.o_oi),
1587 aa->aa_ppga[0]->off,
1588 aa->aa_ppga[aa->aa_page_count-1]->off +
1589 aa->aa_ppga[aa->aa_page_count-1]->count -
1590 1);
1591 CERROR("client %x, server %x, cksum_type %x\n",
1592 client_cksum, server_cksum, cksum_type);
1593 cksum_counter = 0;
1594 aa->aa_oa->o_cksum = client_cksum;
1595 rc = -EAGAIN;
1596 } else {
1597 cksum_counter++;
1598 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1599 rc = 0;
1600 }
1601 } else if (unlikely(client_cksum)) {
1602 static int cksum_missed;
1603
1604 cksum_missed++;
1605 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1606 CERROR("Checksum %u requested from %s but not sent\n",
1607 cksum_missed, libcfs_nid2str(peer->nid));
1608 } else {
1609 rc = 0;
1610 }
1611out:
1612 if (rc >= 0)
1613 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1614 aa->aa_oa, &body->oa);
1615
1616 return rc;
1617}
1618
1619static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1620 struct lov_stripe_md *lsm,
1621 obd_count page_count, struct brw_page **pga,
1622 struct obd_capa *ocapa)
1623{
1624 struct ptlrpc_request *req;
1625 int rc;
1626 wait_queue_head_t waitq;
1627 int generation, resends = 0;
1628 struct l_wait_info lwi;
1629
1630 init_waitqueue_head(&waitq);
1631 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1632
1633restart_bulk:
1634 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1635 page_count, pga, &req, ocapa, 0, resends);
1636 if (rc != 0)
1637 return (rc);
1638
1639 if (resends) {
1640 req->rq_generation_set = 1;
1641 req->rq_import_generation = generation;
1642 req->rq_sent = get_seconds() + resends;
1643 }
1644
1645 rc = ptlrpc_queue_wait(req);
1646
1647 if (rc == -ETIMEDOUT && req->rq_resend) {
1648 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1649 ptlrpc_req_finished(req);
1650 goto restart_bulk;
1651 }
1652
1653 rc = osc_brw_fini_request(req, rc);
1654
1655 ptlrpc_req_finished(req);
1656
1657
1658 if (osc_recoverable_error(rc)) {
1659 resends++;
1660 if (rc != -EINPROGRESS &&
1661 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1662 CERROR("%s: too many resend retries for object: "
1663 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1664 POSTID(&oa->o_oi), rc);
1665 goto out;
1666 }
1667 if (generation !=
1668 exp->exp_obd->u.cli.cl_import->imp_generation) {
1669 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1670 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1671 POSTID(&oa->o_oi), rc);
1672 goto out;
1673 }
1674
1675 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1676 NULL);
1677 l_wait_event(waitq, 0, &lwi);
1678
1679 goto restart_bulk;
1680 }
1681out:
1682 if (rc == -EAGAIN || rc == -EINPROGRESS)
1683 rc = -EIO;
1684 return rc;
1685}
1686
1687static int osc_brw_redo_request(struct ptlrpc_request *request,
1688 struct osc_brw_async_args *aa, int rc)
1689{
1690 struct ptlrpc_request *new_req;
1691 struct osc_brw_async_args *new_aa;
1692 struct osc_async_page *oap;
1693
1694 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1695 "redo for recoverable error %d", rc);
1696
1697 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1698 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1699 aa->aa_cli, aa->aa_oa,
1700 NULL ,
1701 aa->aa_page_count, aa->aa_ppga,
1702 &new_req, aa->aa_ocapa, 0, 1);
1703 if (rc)
1704 return rc;
1705
1706 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1707 if (oap->oap_request != NULL) {
1708 LASSERTF(request == oap->oap_request,
1709 "request %p != oap_request %p\n",
1710 request, oap->oap_request);
1711 if (oap->oap_interrupted) {
1712 ptlrpc_req_finished(new_req);
1713 return -EINTR;
1714 }
1715 }
1716 }
1717
1718
1719 aa->aa_resends++;
1720 new_req->rq_interpret_reply = request->rq_interpret_reply;
1721 new_req->rq_async_args = request->rq_async_args;
1722
1723
1724 if (aa->aa_resends > new_req->rq_timeout)
1725 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1726 else
1727 new_req->rq_sent = get_seconds() + aa->aa_resends;
1728 new_req->rq_generation_set = 1;
1729 new_req->rq_import_generation = request->rq_import_generation;
1730
1731 new_aa = ptlrpc_req_async_args(new_req);
1732
1733 INIT_LIST_HEAD(&new_aa->aa_oaps);
1734 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1735 INIT_LIST_HEAD(&new_aa->aa_exts);
1736 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1737 new_aa->aa_resends = aa->aa_resends;
1738
1739 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1740 if (oap->oap_request) {
1741 ptlrpc_req_finished(oap->oap_request);
1742 oap->oap_request = ptlrpc_request_addref(new_req);
1743 }
1744 }
1745
1746 new_aa->aa_ocapa = aa->aa_ocapa;
1747 aa->aa_ocapa = NULL;
1748
1749
1750
1751
1752
1753 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1754
1755 DEBUG_REQ(D_INFO, new_req, "new request");
1756 return 0;
1757}
1758
1759
1760
1761
1762
1763
1764
1765
1766static void sort_brw_pages(struct brw_page **array, int num)
1767{
1768 int stride, i, j;
1769 struct brw_page *tmp;
1770
1771 if (num == 1)
1772 return;
1773 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1774 ;
1775
1776 do {
1777 stride /= 3;
1778 for (i = stride ; i < num ; i++) {
1779 tmp = array[i];
1780 j = i;
1781 while (j >= stride && array[j - stride]->off > tmp->off) {
1782 array[j] = array[j - stride];
1783 j -= stride;
1784 }
1785 array[j] = tmp;
1786 }
1787 } while (stride > 1);
1788}
1789
1790static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1791{
1792 int count = 1;
1793 int offset;
1794 int i = 0;
1795
1796 LASSERT (pages > 0);
1797 offset = pg[i]->off & ~CFS_PAGE_MASK;
1798
1799 for (;;) {
1800 pages--;
1801 if (pages == 0)
1802 return count;
1803
1804 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1805 return count;
1806
1807 i++;
1808 offset = pg[i]->off & ~CFS_PAGE_MASK;
1809 if (offset != 0)
1810 return count;
1811
1812 count++;
1813 }
1814}
1815
1816static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1817{
1818 struct brw_page **ppga;
1819 int i;
1820
1821 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1822 if (ppga == NULL)
1823 return NULL;
1824
1825 for (i = 0; i < count; i++)
1826 ppga[i] = pga + i;
1827 return ppga;
1828}
1829
1830static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1831{
1832 LASSERT(ppga != NULL);
1833 OBD_FREE(ppga, sizeof(*ppga) * count);
1834}
1835
1836static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1837 obd_count page_count, struct brw_page *pga,
1838 struct obd_trans_info *oti)
1839{
1840 struct obdo *saved_oa = NULL;
1841 struct brw_page **ppga, **orig;
1842 struct obd_import *imp = class_exp2cliimp(exp);
1843 struct client_obd *cli;
1844 int rc, page_count_orig;
1845
1846 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1847 cli = &imp->imp_obd->u.cli;
1848
1849 if (cmd & OBD_BRW_CHECK) {
1850
1851
1852
1853 if (imp->imp_invalid)
1854 return -EIO;
1855 return 0;
1856 }
1857
1858
1859 LASSERT(cli->cl_max_pages_per_rpc);
1860
1861 rc = 0;
1862
1863 orig = ppga = osc_build_ppga(pga, page_count);
1864 if (ppga == NULL)
1865 return -ENOMEM;
1866 page_count_orig = page_count;
1867
1868 sort_brw_pages(ppga, page_count);
1869 while (page_count) {
1870 obd_count pages_per_brw;
1871
1872 if (page_count > cli->cl_max_pages_per_rpc)
1873 pages_per_brw = cli->cl_max_pages_per_rpc;
1874 else
1875 pages_per_brw = page_count;
1876
1877 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1878
1879 if (saved_oa != NULL) {
1880
1881 *oinfo->oi_oa = *saved_oa;
1882 } else if (page_count > pages_per_brw) {
1883
1884 OBDO_ALLOC(saved_oa);
1885 if (saved_oa == NULL)
1886 GOTO(out, rc = -ENOMEM);
1887 *saved_oa = *oinfo->oi_oa;
1888 }
1889
1890 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1891 pages_per_brw, ppga, oinfo->oi_capa);
1892
1893 if (rc != 0)
1894 break;
1895
1896 page_count -= pages_per_brw;
1897 ppga += pages_per_brw;
1898 }
1899
1900out:
1901 osc_release_ppga(orig, page_count_orig);
1902
1903 if (saved_oa != NULL)
1904 OBDO_FREE(saved_oa);
1905
1906 return rc;
1907}
1908
1909static int brw_interpret(const struct lu_env *env,
1910 struct ptlrpc_request *req, void *data, int rc)
1911{
1912 struct osc_brw_async_args *aa = data;
1913 struct osc_extent *ext;
1914 struct osc_extent *tmp;
1915 struct cl_object *obj = NULL;
1916 struct client_obd *cli = aa->aa_cli;
1917
1918 rc = osc_brw_fini_request(req, rc);
1919 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1920
1921
1922 if (osc_recoverable_error(rc)) {
1923 if (req->rq_import_generation !=
1924 req->rq_import->imp_generation) {
1925 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1926 ""DOSTID", rc = %d.\n",
1927 req->rq_import->imp_obd->obd_name,
1928 POSTID(&aa->aa_oa->o_oi), rc);
1929 } else if (rc == -EINPROGRESS ||
1930 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1931 rc = osc_brw_redo_request(req, aa, rc);
1932 } else {
1933 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1934 req->rq_import->imp_obd->obd_name,
1935 POSTID(&aa->aa_oa->o_oi), rc);
1936 }
1937
1938 if (rc == 0)
1939 return 0;
1940 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1941 rc = -EIO;
1942 }
1943
1944 if (aa->aa_ocapa) {
1945 capa_put(aa->aa_ocapa);
1946 aa->aa_ocapa = NULL;
1947 }
1948
1949 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1950 if (obj == NULL && rc == 0) {
1951 obj = osc2cl(ext->oe_obj);
1952 cl_object_get(obj);
1953 }
1954
1955 list_del_init(&ext->oe_link);
1956 osc_extent_finish(env, ext, 1, rc);
1957 }
1958 LASSERT(list_empty(&aa->aa_exts));
1959 LASSERT(list_empty(&aa->aa_oaps));
1960
1961 if (obj != NULL) {
1962 struct obdo *oa = aa->aa_oa;
1963 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1964 unsigned long valid = 0;
1965
1966 LASSERT(rc == 0);
1967 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1968 attr->cat_blocks = oa->o_blocks;
1969 valid |= CAT_BLOCKS;
1970 }
1971 if (oa->o_valid & OBD_MD_FLMTIME) {
1972 attr->cat_mtime = oa->o_mtime;
1973 valid |= CAT_MTIME;
1974 }
1975 if (oa->o_valid & OBD_MD_FLATIME) {
1976 attr->cat_atime = oa->o_atime;
1977 valid |= CAT_ATIME;
1978 }
1979 if (oa->o_valid & OBD_MD_FLCTIME) {
1980 attr->cat_ctime = oa->o_ctime;
1981 valid |= CAT_CTIME;
1982 }
1983 if (valid != 0) {
1984 cl_object_attr_lock(obj);
1985 cl_object_attr_set(env, obj, attr, valid);
1986 cl_object_attr_unlock(obj);
1987 }
1988 cl_object_put(env, obj);
1989 }
1990 OBDO_FREE(aa->aa_oa);
1991
1992 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1993 req->rq_bulk->bd_nob_transferred);
1994 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1995 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1996
1997 client_obd_list_lock(&cli->cl_loi_list_lock);
1998
1999
2000
2001 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2002 cli->cl_w_in_flight--;
2003 else
2004 cli->cl_r_in_flight--;
2005 osc_wake_cache_waiters(cli);
2006 client_obd_list_unlock(&cli->cl_loi_list_lock);
2007
2008 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2009 return rc;
2010}
2011
2012
2013
2014
2015
2016
2017int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2018 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2019{
2020 struct ptlrpc_request *req = NULL;
2021 struct osc_extent *ext;
2022 struct brw_page **pga = NULL;
2023 struct osc_brw_async_args *aa = NULL;
2024 struct obdo *oa = NULL;
2025 struct osc_async_page *oap;
2026 struct osc_async_page *tmp;
2027 struct cl_req *clerq = NULL;
2028 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2029 CRT_READ;
2030 struct ldlm_lock *lock = NULL;
2031 struct cl_req_attr *crattr = NULL;
2032 obd_off starting_offset = OBD_OBJECT_EOF;
2033 obd_off ending_offset = 0;
2034 int mpflag = 0;
2035 int mem_tight = 0;
2036 int page_count = 0;
2037 int i;
2038 int rc;
2039 LIST_HEAD(rpc_list);
2040
2041 LASSERT(!list_empty(ext_list));
2042
2043
2044 list_for_each_entry(ext, ext_list, oe_link) {
2045 LASSERT(ext->oe_state == OES_RPC);
2046 mem_tight |= ext->oe_memalloc;
2047 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2048 ++page_count;
2049 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2050 if (starting_offset > oap->oap_obj_off)
2051 starting_offset = oap->oap_obj_off;
2052 else
2053 LASSERT(oap->oap_page_off == 0);
2054 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2055 ending_offset = oap->oap_obj_off +
2056 oap->oap_count;
2057 else
2058 LASSERT(oap->oap_page_off + oap->oap_count ==
2059 PAGE_CACHE_SIZE);
2060 }
2061 }
2062
2063 if (mem_tight)
2064 mpflag = cfs_memory_pressure_get_and_set();
2065
2066 OBD_ALLOC(crattr, sizeof(*crattr));
2067 if (crattr == NULL)
2068 GOTO(out, rc = -ENOMEM);
2069
2070 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2071 if (pga == NULL)
2072 GOTO(out, rc = -ENOMEM);
2073
2074 OBDO_ALLOC(oa);
2075 if (oa == NULL)
2076 GOTO(out, rc = -ENOMEM);
2077
2078 i = 0;
2079 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2080 struct cl_page *page = oap2cl_page(oap);
2081 if (clerq == NULL) {
2082 clerq = cl_req_alloc(env, page, crt,
2083 1 );
2084 if (IS_ERR(clerq))
2085 GOTO(out, rc = PTR_ERR(clerq));
2086 lock = oap->oap_ldlm_lock;
2087 }
2088 if (mem_tight)
2089 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2090 pga[i] = &oap->oap_brw_page;
2091 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2092 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2093 pga[i]->pg, page_index(oap->oap_page), oap,
2094 pga[i]->flag);
2095 i++;
2096 cl_req_page_add(env, clerq, page);
2097 }
2098
2099
2100 LASSERT(clerq != NULL);
2101 crattr->cra_oa = oa;
2102 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2103 if (lock) {
2104 oa->o_handle = lock->l_remote_handle;
2105 oa->o_valid |= OBD_MD_FLHANDLE;
2106 }
2107
2108 rc = cl_req_prep(env, clerq);
2109 if (rc != 0) {
2110 CERROR("cl_req_prep failed: %d\n", rc);
2111 GOTO(out, rc);
2112 }
2113
2114 sort_brw_pages(pga, page_count);
2115 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2116 pga, &req, crattr->cra_capa, 1, 0);
2117 if (rc != 0) {
2118 CERROR("prep_req failed: %d\n", rc);
2119 GOTO(out, rc);
2120 }
2121
2122 req->rq_interpret_reply = brw_interpret;
2123
2124 if (mem_tight != 0)
2125 req->rq_memalloc = 1;
2126
2127
2128
2129
2130
2131
2132 cl_req_attr_set(env, clerq, crattr,
2133 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2134
2135 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2136
2137 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2138 aa = ptlrpc_req_async_args(req);
2139 INIT_LIST_HEAD(&aa->aa_oaps);
2140 list_splice_init(&rpc_list, &aa->aa_oaps);
2141 INIT_LIST_HEAD(&aa->aa_exts);
2142 list_splice_init(ext_list, &aa->aa_exts);
2143 aa->aa_clerq = clerq;
2144
2145
2146
2147 tmp = NULL;
2148 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2149
2150 if (tmp == NULL)
2151 tmp = oap;
2152 if (oap->oap_interrupted && !req->rq_intr) {
2153 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2154 oap, req);
2155 ptlrpc_mark_interrupted(req);
2156 }
2157 }
2158 if (tmp != NULL)
2159 tmp->oap_request = ptlrpc_request_addref(req);
2160
2161 client_obd_list_lock(&cli->cl_loi_list_lock);
2162 starting_offset >>= PAGE_CACHE_SHIFT;
2163 if (cmd == OBD_BRW_READ) {
2164 cli->cl_r_in_flight++;
2165 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2166 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2167 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2168 starting_offset + 1);
2169 } else {
2170 cli->cl_w_in_flight++;
2171 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2172 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2173 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2174 starting_offset + 1);
2175 }
2176 client_obd_list_unlock(&cli->cl_loi_list_lock);
2177
2178 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2179 page_count, aa, cli->cl_r_in_flight,
2180 cli->cl_w_in_flight);
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194 ptlrpcd_add_req(req, pol, -1);
2195 rc = 0;
2196
2197out:
2198 if (mem_tight != 0)
2199 cfs_memory_pressure_restore(mpflag);
2200
2201 if (crattr != NULL) {
2202 capa_put(crattr->cra_capa);
2203 OBD_FREE(crattr, sizeof(*crattr));
2204 }
2205
2206 if (rc != 0) {
2207 LASSERT(req == NULL);
2208
2209 if (oa)
2210 OBDO_FREE(oa);
2211 if (pga)
2212 OBD_FREE(pga, sizeof(*pga) * page_count);
2213
2214
2215 while (!list_empty(ext_list)) {
2216 ext = list_entry(ext_list->next, struct osc_extent,
2217 oe_link);
2218 list_del_init(&ext->oe_link);
2219 osc_extent_finish(env, ext, 0, rc);
2220 }
2221 if (clerq && !IS_ERR(clerq))
2222 cl_req_completion(env, clerq, rc);
2223 }
2224 return rc;
2225}
2226
2227static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2228 struct ldlm_enqueue_info *einfo)
2229{
2230 void *data = einfo->ei_cbdata;
2231 int set = 0;
2232
2233 LASSERT(lock != NULL);
2234 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2235 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2236 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2237 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2238
2239 lock_res_and_lock(lock);
2240 spin_lock(&osc_ast_guard);
2241
2242 if (lock->l_ast_data == NULL)
2243 lock->l_ast_data = data;
2244 if (lock->l_ast_data == data)
2245 set = 1;
2246
2247 spin_unlock(&osc_ast_guard);
2248 unlock_res_and_lock(lock);
2249
2250 return set;
2251}
2252
2253static int osc_set_data_with_check(struct lustre_handle *lockh,
2254 struct ldlm_enqueue_info *einfo)
2255{
2256 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2257 int set = 0;
2258
2259 if (lock != NULL) {
2260 set = osc_set_lock_data_with_check(lock, einfo);
2261 LDLM_LOCK_PUT(lock);
2262 } else
2263 CERROR("lockh %p, data %p - client evicted?\n",
2264 lockh, einfo->ei_cbdata);
2265 return set;
2266}
2267
2268static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2269 ldlm_iterator_t replace, void *data)
2270{
2271 struct ldlm_res_id res_id;
2272 struct obd_device *obd = class_exp2obd(exp);
2273
2274 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2275 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2276 return 0;
2277}
2278
2279
2280
2281
2282
2283static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2284 ldlm_iterator_t replace, void *data)
2285{
2286 struct ldlm_res_id res_id;
2287 struct obd_device *obd = class_exp2obd(exp);
2288 int rc = 0;
2289
2290 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2291 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2292 if (rc == LDLM_ITER_STOP)
2293 return(1);
2294 if (rc == LDLM_ITER_CONTINUE)
2295 return(0);
2296 return(rc);
2297}
2298
2299static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2300 obd_enqueue_update_f upcall, void *cookie,
2301 __u64 *flags, int agl, int rc)
2302{
2303 int intent = *flags & LDLM_FL_HAS_INTENT;
2304
2305 if (intent) {
2306
2307 if (rc == ELDLM_LOCK_ABORTED) {
2308 struct ldlm_reply *rep;
2309 rep = req_capsule_server_get(&req->rq_pill,
2310 &RMF_DLM_REP);
2311
2312 LASSERT(rep != NULL);
2313 rep->lock_policy_res1 =
2314 ptlrpc_status_ntoh(rep->lock_policy_res1);
2315 if (rep->lock_policy_res1)
2316 rc = rep->lock_policy_res1;
2317 }
2318 }
2319
2320 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2321 (rc == 0)) {
2322 *flags |= LDLM_FL_LVB_READY;
2323 CDEBUG(D_INODE,"got kms %llu blocks %llu mtime %llu\n",
2324 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2325 }
2326
2327
2328 rc = (*upcall)(cookie, rc);
2329 return rc;
2330}
2331
2332static int osc_enqueue_interpret(const struct lu_env *env,
2333 struct ptlrpc_request *req,
2334 struct osc_enqueue_args *aa, int rc)
2335{
2336 struct ldlm_lock *lock;
2337 struct lustre_handle handle;
2338 __u32 mode;
2339 struct ost_lvb *lvb;
2340 __u32 lvb_len;
2341 __u64 *flags = aa->oa_flags;
2342
2343
2344
2345 lustre_handle_copy(&handle, aa->oa_lockh);
2346 mode = aa->oa_ei->ei_mode;
2347
2348
2349
2350 lock = ldlm_handle2lock(&handle);
2351
2352
2353
2354
2355
2356 ldlm_lock_addref(&handle, mode);
2357
2358
2359 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2360
2361 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2362 lvb = NULL;
2363 lvb_len = 0;
2364 } else {
2365 lvb = aa->oa_lvb;
2366 lvb_len = sizeof(*aa->oa_lvb);
2367 }
2368
2369
2370 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2371 mode, flags, lvb, lvb_len, &handle, rc);
2372
2373 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2374 flags, aa->oa_agl, rc);
2375
2376 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2377
2378
2379 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2380
2381
2382
2383
2384
2385 ldlm_lock_decref(&handle, mode);
2386
2387 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2388 aa->oa_lockh, req, aa);
2389 ldlm_lock_decref(&handle, mode);
2390 LDLM_LOCK_PUT(lock);
2391 return rc;
2392}
2393
2394void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2395 struct lov_oinfo *loi, __u64 flags,
2396 struct ost_lvb *lvb, __u32 mode, int rc)
2397{
2398 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2399
2400 if (rc == ELDLM_OK) {
2401 __u64 tmp;
2402
2403 LASSERT(lock != NULL);
2404 loi->loi_lvb = *lvb;
2405 tmp = loi->loi_lvb.lvb_size;
2406
2407
2408 if (tmp > lock->l_policy_data.l_extent.end)
2409 tmp = lock->l_policy_data.l_extent.end + 1;
2410 if (tmp >= loi->loi_kms) {
2411 LDLM_DEBUG(lock, "lock acquired, setting rss=%llu, kms=%llu",
2412 loi->loi_lvb.lvb_size, tmp);
2413 loi_kms_set(loi, tmp);
2414 } else {
2415 LDLM_DEBUG(lock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
2416 loi->loi_lvb.lvb_size, loi->loi_kms,
2417 lock->l_policy_data.l_extent.end);
2418 }
2419 ldlm_lock_allow_match(lock);
2420 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2421 LASSERT(lock != NULL);
2422 loi->loi_lvb = *lvb;
2423 ldlm_lock_allow_match(lock);
2424 CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
2425 loi->loi_lvb.lvb_size, loi->loi_kms);
2426 rc = ELDLM_OK;
2427 }
2428
2429 if (lock != NULL) {
2430 if (rc != ELDLM_OK)
2431 ldlm_lock_fail_match(lock);
2432
2433 LDLM_LOCK_PUT(lock);
2434 }
2435}
2436EXPORT_SYMBOL(osc_update_enqueue);
2437
2438struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2439
2440
2441
2442
2443
2444
2445
2446
2447int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2448 __u64 *flags, ldlm_policy_data_t *policy,
2449 struct ost_lvb *lvb, int kms_valid,
2450 obd_enqueue_update_f upcall, void *cookie,
2451 struct ldlm_enqueue_info *einfo,
2452 struct lustre_handle *lockh,
2453 struct ptlrpc_request_set *rqset, int async, int agl)
2454{
2455 struct obd_device *obd = exp->exp_obd;
2456 struct ptlrpc_request *req = NULL;
2457 int intent = *flags & LDLM_FL_HAS_INTENT;
2458 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2459 ldlm_mode_t mode;
2460 int rc;
2461
2462
2463
2464 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2465 policy->l_extent.end |= ~CFS_PAGE_MASK;
2466
2467
2468
2469
2470
2471
2472
2473 if (!kms_valid)
2474 goto no_match;
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488 mode = einfo->ei_mode;
2489 if (einfo->ei_mode == LCK_PR)
2490 mode |= LCK_PW;
2491 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2492 einfo->ei_type, policy, mode, lockh, 0);
2493 if (mode) {
2494 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2495
2496 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2497
2498
2499
2500 ldlm_lock_decref(lockh, mode);
2501 LDLM_LOCK_PUT(matched);
2502 return -ECANCELED;
2503 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2504 *flags |= LDLM_FL_LVB_READY;
2505
2506
2507 if (!rqset && einfo->ei_mode != mode)
2508 ldlm_lock_addref(lockh, LCK_PR);
2509 if (intent) {
2510
2511
2512
2513 }
2514
2515
2516
2517
2518
2519 (*upcall)(cookie, ELDLM_OK);
2520
2521 if (einfo->ei_mode != mode)
2522 ldlm_lock_decref(lockh, LCK_PW);
2523 else if (rqset)
2524
2525 ldlm_lock_decref(lockh, einfo->ei_mode);
2526 LDLM_LOCK_PUT(matched);
2527 return ELDLM_OK;
2528 } else {
2529 ldlm_lock_decref(lockh, mode);
2530 LDLM_LOCK_PUT(matched);
2531 }
2532 }
2533
2534 no_match:
2535 if (intent) {
2536 LIST_HEAD(cancels);
2537 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2538 &RQF_LDLM_ENQUEUE_LVB);
2539 if (req == NULL)
2540 return -ENOMEM;
2541
2542 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2543 if (rc) {
2544 ptlrpc_request_free(req);
2545 return rc;
2546 }
2547
2548 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2549 sizeof(*lvb));
2550 ptlrpc_request_set_replen(req);
2551 }
2552
2553
2554 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2555
2556 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2557 sizeof(*lvb), LVB_T_OST, lockh, async);
2558 if (rqset) {
2559 if (!rc) {
2560 struct osc_enqueue_args *aa;
2561 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2562 aa = ptlrpc_req_async_args(req);
2563 aa->oa_ei = einfo;
2564 aa->oa_exp = exp;
2565 aa->oa_flags = flags;
2566 aa->oa_upcall = upcall;
2567 aa->oa_cookie = cookie;
2568 aa->oa_lvb = lvb;
2569 aa->oa_lockh = lockh;
2570 aa->oa_agl = !!agl;
2571
2572 req->rq_interpret_reply =
2573 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2574 if (rqset == PTLRPCD_SET)
2575 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2576 else
2577 ptlrpc_set_add_req(rqset, req);
2578 } else if (intent) {
2579 ptlrpc_req_finished(req);
2580 }
2581 return rc;
2582 }
2583
2584 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2585 if (intent)
2586 ptlrpc_req_finished(req);
2587
2588 return rc;
2589}
2590
2591static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2592 struct ldlm_enqueue_info *einfo,
2593 struct ptlrpc_request_set *rqset)
2594{
2595 struct ldlm_res_id res_id;
2596 int rc;
2597
2598 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2599 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2600 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2601 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2602 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2603 rqset, rqset != NULL, 0);
2604 return rc;
2605}
2606
2607int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2608 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2609 __u64 *flags, void *data, struct lustre_handle *lockh,
2610 int unref)
2611{
2612 struct obd_device *obd = exp->exp_obd;
2613 __u64 lflags = *flags;
2614 ldlm_mode_t rc;
2615
2616 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2617 return -EIO;
2618
2619
2620
2621 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2622 policy->l_extent.end |= ~CFS_PAGE_MASK;
2623
2624
2625
2626
2627
2628 rc = mode;
2629 if (mode == LCK_PR)
2630 rc |= LCK_PW;
2631 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2632 res_id, type, policy, rc, lockh, unref);
2633 if (rc) {
2634 if (data != NULL) {
2635 if (!osc_set_data_with_check(lockh, data)) {
2636 if (!(lflags & LDLM_FL_TEST_LOCK))
2637 ldlm_lock_decref(lockh, rc);
2638 return 0;
2639 }
2640 }
2641 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2642 ldlm_lock_addref(lockh, LCK_PR);
2643 ldlm_lock_decref(lockh, LCK_PW);
2644 }
2645 return rc;
2646 }
2647 return rc;
2648}
2649
2650int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2651{
2652 if (unlikely(mode == LCK_GROUP))
2653 ldlm_lock_decref_and_cancel(lockh, mode);
2654 else
2655 ldlm_lock_decref(lockh, mode);
2656
2657 return 0;
2658}
2659
2660static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2661 __u32 mode, struct lustre_handle *lockh)
2662{
2663 return osc_cancel_base(lockh, mode);
2664}
2665
2666static int osc_cancel_unused(struct obd_export *exp,
2667 struct lov_stripe_md *lsm,
2668 ldlm_cancel_flags_t flags,
2669 void *opaque)
2670{
2671 struct obd_device *obd = class_exp2obd(exp);
2672 struct ldlm_res_id res_id, *resp = NULL;
2673
2674 if (lsm != NULL) {
2675 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2676 resp = &res_id;
2677 }
2678
2679 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2680}
2681
2682static int osc_statfs_interpret(const struct lu_env *env,
2683 struct ptlrpc_request *req,
2684 struct osc_async_args *aa, int rc)
2685{
2686 struct obd_statfs *msfs;
2687
2688 if (rc == -EBADR)
2689
2690
2691
2692
2693
2694 return rc;
2695
2696 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2697 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2698 GOTO(out, rc = 0);
2699
2700 if (rc != 0)
2701 GOTO(out, rc);
2702
2703 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2704 if (msfs == NULL) {
2705 GOTO(out, rc = -EPROTO);
2706 }
2707
2708 *aa->aa_oi->oi_osfs = *msfs;
2709out:
2710 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2711 return rc;
2712}
2713
2714static int osc_statfs_async(struct obd_export *exp,
2715 struct obd_info *oinfo, __u64 max_age,
2716 struct ptlrpc_request_set *rqset)
2717{
2718 struct obd_device *obd = class_exp2obd(exp);
2719 struct ptlrpc_request *req;
2720 struct osc_async_args *aa;
2721 int rc;
2722
2723
2724
2725
2726
2727
2728
2729 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2730 if (req == NULL)
2731 return -ENOMEM;
2732
2733 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2734 if (rc) {
2735 ptlrpc_request_free(req);
2736 return rc;
2737 }
2738 ptlrpc_request_set_replen(req);
2739 req->rq_request_portal = OST_CREATE_PORTAL;
2740 ptlrpc_at_set_req_timeout(req);
2741
2742 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2743
2744 req->rq_no_resend = 1;
2745 req->rq_no_delay = 1;
2746 }
2747
2748 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2749 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2750 aa = ptlrpc_req_async_args(req);
2751 aa->aa_oi = oinfo;
2752
2753 ptlrpc_set_add_req(rqset, req);
2754 return 0;
2755}
2756
2757static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2758 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2759{
2760 struct obd_device *obd = class_exp2obd(exp);
2761 struct obd_statfs *msfs;
2762 struct ptlrpc_request *req;
2763 struct obd_import *imp = NULL;
2764 int rc;
2765
2766
2767
2768 down_read(&obd->u.cli.cl_sem);
2769 if (obd->u.cli.cl_import)
2770 imp = class_import_get(obd->u.cli.cl_import);
2771 up_read(&obd->u.cli.cl_sem);
2772 if (!imp)
2773 return -ENODEV;
2774
2775
2776
2777
2778
2779
2780
2781 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2782
2783 class_import_put(imp);
2784
2785 if (req == NULL)
2786 return -ENOMEM;
2787
2788 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2789 if (rc) {
2790 ptlrpc_request_free(req);
2791 return rc;
2792 }
2793 ptlrpc_request_set_replen(req);
2794 req->rq_request_portal = OST_CREATE_PORTAL;
2795 ptlrpc_at_set_req_timeout(req);
2796
2797 if (flags & OBD_STATFS_NODELAY) {
2798
2799 req->rq_no_resend = 1;
2800 req->rq_no_delay = 1;
2801 }
2802
2803 rc = ptlrpc_queue_wait(req);
2804 if (rc)
2805 GOTO(out, rc);
2806
2807 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2808 if (msfs == NULL) {
2809 GOTO(out, rc = -EPROTO);
2810 }
2811
2812 *osfs = *msfs;
2813
2814 out:
2815 ptlrpc_req_finished(req);
2816 return rc;
2817}
2818
2819
2820
2821
2822
2823
2824
2825static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2826{
2827
2828 struct lov_user_md_v3 lum, *lumk;
2829 struct lov_user_ost_data_v1 *lmm_objects;
2830 int rc = 0, lum_size;
2831
2832 if (!lsm)
2833 return -ENODATA;
2834
2835
2836
2837 lum_size = sizeof(struct lov_user_md_v1);
2838 if (copy_from_user(&lum, lump, lum_size))
2839 return -EFAULT;
2840
2841 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2842 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2843 return -EINVAL;
2844
2845
2846 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2847 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2848 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2849
2850
2851
2852 if (lum.lmm_stripe_count > 0) {
2853 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2854 OBD_ALLOC(lumk, lum_size);
2855 if (!lumk)
2856 return -ENOMEM;
2857
2858 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2859 lmm_objects =
2860 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2861 else
2862 lmm_objects = &(lumk->lmm_objects[0]);
2863 lmm_objects->l_ost_oi = lsm->lsm_oi;
2864 } else {
2865 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2866 lumk = &lum;
2867 }
2868
2869 lumk->lmm_oi = lsm->lsm_oi;
2870 lumk->lmm_stripe_count = 1;
2871
2872 if (copy_to_user(lump, lumk, lum_size))
2873 rc = -EFAULT;
2874
2875 if (lumk != &lum)
2876 OBD_FREE(lumk, lum_size);
2877
2878 return rc;
2879}
2880
2881
2882static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2883 void *karg, void *uarg)
2884{
2885 struct obd_device *obd = exp->exp_obd;
2886 struct obd_ioctl_data *data = karg;
2887 int err = 0;
2888
2889 if (!try_module_get(THIS_MODULE)) {
2890 CERROR("Can't get module. Is it alive?");
2891 return -EINVAL;
2892 }
2893 switch (cmd) {
2894 case OBD_IOC_LOV_GET_CONFIG: {
2895 char *buf;
2896 struct lov_desc *desc;
2897 struct obd_uuid uuid;
2898
2899 buf = NULL;
2900 len = 0;
2901 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2902 GOTO(out, err = -EINVAL);
2903
2904 data = (struct obd_ioctl_data *)buf;
2905
2906 if (sizeof(*desc) > data->ioc_inllen1) {
2907 obd_ioctl_freedata(buf, len);
2908 GOTO(out, err = -EINVAL);
2909 }
2910
2911 if (data->ioc_inllen2 < sizeof(uuid)) {
2912 obd_ioctl_freedata(buf, len);
2913 GOTO(out, err = -EINVAL);
2914 }
2915
2916 desc = (struct lov_desc *)data->ioc_inlbuf1;
2917 desc->ld_tgt_count = 1;
2918 desc->ld_active_tgt_count = 1;
2919 desc->ld_default_stripe_count = 1;
2920 desc->ld_default_stripe_size = 0;
2921 desc->ld_default_stripe_offset = 0;
2922 desc->ld_pattern = 0;
2923 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2924
2925 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2926
2927 err = copy_to_user((void *)uarg, buf, len);
2928 if (err)
2929 err = -EFAULT;
2930 obd_ioctl_freedata(buf, len);
2931 GOTO(out, err);
2932 }
2933 case LL_IOC_LOV_SETSTRIPE:
2934 err = obd_alloc_memmd(exp, karg);
2935 if (err > 0)
2936 err = 0;
2937 GOTO(out, err);
2938 case LL_IOC_LOV_GETSTRIPE:
2939 err = osc_getstripe(karg, uarg);
2940 GOTO(out, err);
2941 case OBD_IOC_CLIENT_RECOVER:
2942 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2943 data->ioc_inlbuf1, 0);
2944 if (err > 0)
2945 err = 0;
2946 GOTO(out, err);
2947 case IOC_OSC_SET_ACTIVE:
2948 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2949 data->ioc_offset);
2950 GOTO(out, err);
2951 case OBD_IOC_POLL_QUOTACHECK:
2952 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2953 GOTO(out, err);
2954 case OBD_IOC_PING_TARGET:
2955 err = ptlrpc_obd_ping(obd);
2956 GOTO(out, err);
2957 default:
2958 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2959 cmd, current_comm());
2960 GOTO(out, err = -ENOTTY);
2961 }
2962out:
2963 module_put(THIS_MODULE);
2964 return err;
2965}
2966
2967static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2968 obd_count keylen, void *key, __u32 *vallen, void *val,
2969 struct lov_stripe_md *lsm)
2970{
2971 if (!vallen || !val)
2972 return -EFAULT;
2973
2974 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2975 __u32 *stripe = val;
2976 *vallen = sizeof(*stripe);
2977 *stripe = 0;
2978 return 0;
2979 } else if (KEY_IS(KEY_LAST_ID)) {
2980 struct ptlrpc_request *req;
2981 obd_id *reply;
2982 char *tmp;
2983 int rc;
2984
2985 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2986 &RQF_OST_GET_INFO_LAST_ID);
2987 if (req == NULL)
2988 return -ENOMEM;
2989
2990 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2991 RCL_CLIENT, keylen);
2992 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2993 if (rc) {
2994 ptlrpc_request_free(req);
2995 return rc;
2996 }
2997
2998 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2999 memcpy(tmp, key, keylen);
3000
3001 req->rq_no_delay = req->rq_no_resend = 1;
3002 ptlrpc_request_set_replen(req);
3003 rc = ptlrpc_queue_wait(req);
3004 if (rc)
3005 GOTO(out, rc);
3006
3007 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3008 if (reply == NULL)
3009 GOTO(out, rc = -EPROTO);
3010
3011 *((obd_id *)val) = *reply;
3012 out:
3013 ptlrpc_req_finished(req);
3014 return rc;
3015 } else if (KEY_IS(KEY_FIEMAP)) {
3016 struct ll_fiemap_info_key *fm_key =
3017 (struct ll_fiemap_info_key *)key;
3018 struct ldlm_res_id res_id;
3019 ldlm_policy_data_t policy;
3020 struct lustre_handle lockh;
3021 ldlm_mode_t mode = 0;
3022 struct ptlrpc_request *req;
3023 struct ll_user_fiemap *reply;
3024 char *tmp;
3025 int rc;
3026
3027 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3028 goto skip_locking;
3029
3030 policy.l_extent.start = fm_key->fiemap.fm_start &
3031 CFS_PAGE_MASK;
3032
3033 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3034 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3035 policy.l_extent.end = OBD_OBJECT_EOF;
3036 else
3037 policy.l_extent.end = (fm_key->fiemap.fm_start +
3038 fm_key->fiemap.fm_length +
3039 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3040
3041 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3042 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3043 LDLM_FL_BLOCK_GRANTED |
3044 LDLM_FL_LVB_READY,
3045 &res_id, LDLM_EXTENT, &policy,
3046 LCK_PR | LCK_PW, &lockh, 0);
3047 if (mode) {
3048 if (mode != LCK_PR) {
3049 ldlm_lock_addref(&lockh, LCK_PR);
3050 ldlm_lock_decref(&lockh, LCK_PW);
3051 }
3052 } else {
3053 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3054 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3055 }
3056
3057skip_locking:
3058 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3059 &RQF_OST_GET_INFO_FIEMAP);
3060 if (req == NULL)
3061 GOTO(drop_lock, rc = -ENOMEM);
3062
3063 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3064 RCL_CLIENT, keylen);
3065 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3066 RCL_CLIENT, *vallen);
3067 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068 RCL_SERVER, *vallen);
3069
3070 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3071 if (rc) {
3072 ptlrpc_request_free(req);
3073 GOTO(drop_lock, rc);
3074 }
3075
3076 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3077 memcpy(tmp, key, keylen);
3078 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3079 memcpy(tmp, val, *vallen);
3080
3081 ptlrpc_request_set_replen(req);
3082 rc = ptlrpc_queue_wait(req);
3083 if (rc)
3084 GOTO(fini_req, rc);
3085
3086 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087 if (reply == NULL)
3088 GOTO(fini_req, rc = -EPROTO);
3089
3090 memcpy(val, reply, *vallen);
3091fini_req:
3092 ptlrpc_req_finished(req);
3093drop_lock:
3094 if (mode)
3095 ldlm_lock_decref(&lockh, LCK_PR);
3096 return rc;
3097 }
3098
3099 return -EINVAL;
3100}
3101
3102static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3103 obd_count keylen, void *key, obd_count vallen,
3104 void *val, struct ptlrpc_request_set *set)
3105{
3106 struct ptlrpc_request *req;
3107 struct obd_device *obd = exp->exp_obd;
3108 struct obd_import *imp = class_exp2cliimp(exp);
3109 char *tmp;
3110 int rc;
3111
3112 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114 if (KEY_IS(KEY_CHECKSUM)) {
3115 if (vallen != sizeof(int))
3116 return -EINVAL;
3117 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118 return 0;
3119 }
3120
3121 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122 sptlrpc_conf_client_adapt(obd);
3123 return 0;
3124 }
3125
3126 if (KEY_IS(KEY_FLUSH_CTX)) {
3127 sptlrpc_import_flush_my_ctx(imp);
3128 return 0;
3129 }
3130
3131 if (KEY_IS(KEY_CACHE_SET)) {
3132 struct client_obd *cli = &obd->u.cli;
3133
3134 LASSERT(cli->cl_cache == NULL);
3135 cli->cl_cache = (struct cl_client_cache *)val;
3136 atomic_inc(&cli->cl_cache->ccc_users);
3137 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3138
3139
3140 LASSERT(list_empty(&cli->cl_lru_osc));
3141 spin_lock(&cli->cl_cache->ccc_lru_lock);
3142 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3143 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3144
3145 return 0;
3146 }
3147
3148 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3149 struct client_obd *cli = &obd->u.cli;
3150 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3151 int target = *(int *)val;
3152
3153 nr = osc_lru_shrink(cli, min(nr, target));
3154 *(int *)val -= nr;
3155 return 0;
3156 }
3157
3158 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3159 return -EINVAL;
3160
3161
3162
3163
3164
3165
3166
3167
3168 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3169 &RQF_OST_SET_GRANT_INFO :
3170 &RQF_OBD_SET_INFO);
3171 if (req == NULL)
3172 return -ENOMEM;
3173
3174 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175 RCL_CLIENT, keylen);
3176 if (!KEY_IS(KEY_GRANT_SHRINK))
3177 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3178 RCL_CLIENT, vallen);
3179 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3180 if (rc) {
3181 ptlrpc_request_free(req);
3182 return rc;
3183 }
3184
3185 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3186 memcpy(tmp, key, keylen);
3187 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3188 &RMF_OST_BODY :
3189 &RMF_SETINFO_VAL);
3190 memcpy(tmp, val, vallen);
3191
3192 if (KEY_IS(KEY_GRANT_SHRINK)) {
3193 struct osc_grant_args *aa;
3194 struct obdo *oa;
3195
3196 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3197 aa = ptlrpc_req_async_args(req);
3198 OBDO_ALLOC(oa);
3199 if (!oa) {
3200 ptlrpc_req_finished(req);
3201 return -ENOMEM;
3202 }
3203 *oa = ((struct ost_body *)val)->oa;
3204 aa->aa_oa = oa;
3205 req->rq_interpret_reply = osc_shrink_grant_interpret;
3206 }
3207
3208 ptlrpc_request_set_replen(req);
3209 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3210 LASSERT(set != NULL);
3211 ptlrpc_set_add_req(set, req);
3212 ptlrpc_check_set(NULL, set);
3213 } else
3214 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3215
3216 return 0;
3217}
3218
3219
3220static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3221 struct obd_device *disk_obd, int *index)
3222{
3223
3224
3225 LBUG();
3226 return 0;
3227}
3228
3229static int osc_llog_finish(struct obd_device *obd, int count)
3230{
3231 struct llog_ctxt *ctxt;
3232
3233 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3234 if (ctxt) {
3235 llog_cat_close(NULL, ctxt->loc_handle);
3236 llog_cleanup(NULL, ctxt);
3237 }
3238
3239 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3240 if (ctxt)
3241 llog_cleanup(NULL, ctxt);
3242 return 0;
3243}
3244
3245static int osc_reconnect(const struct lu_env *env,
3246 struct obd_export *exp, struct obd_device *obd,
3247 struct obd_uuid *cluuid,
3248 struct obd_connect_data *data,
3249 void *localdata)
3250{
3251 struct client_obd *cli = &obd->u.cli;
3252
3253 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3254 long lost_grant;
3255
3256 client_obd_list_lock(&cli->cl_loi_list_lock);
3257 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3258 2 * cli_brw_size(obd);
3259 lost_grant = cli->cl_lost_grant;
3260 cli->cl_lost_grant = 0;
3261 client_obd_list_unlock(&cli->cl_loi_list_lock);
3262
3263 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3264 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3265 data->ocd_version, data->ocd_grant, lost_grant);
3266 }
3267
3268 return 0;
3269}
3270
3271static int osc_disconnect(struct obd_export *exp)
3272{
3273 struct obd_device *obd = class_exp2obd(exp);
3274 struct llog_ctxt *ctxt;
3275 int rc;
3276
3277 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3278 if (ctxt) {
3279 if (obd->u.cli.cl_conn_count == 1) {
3280
3281
3282 llog_sync(ctxt, exp, 0);
3283 }
3284 llog_ctxt_put(ctxt);
3285 } else {
3286 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3287 obd);
3288 }
3289
3290 rc = client_disconnect_export(exp);
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308 if (obd->u.cli.cl_import == NULL)
3309 osc_del_shrink_grant(&obd->u.cli);
3310 return rc;
3311}
3312
3313static int osc_import_event(struct obd_device *obd,
3314 struct obd_import *imp,
3315 enum obd_import_event event)
3316{
3317 struct client_obd *cli;
3318 int rc = 0;
3319
3320 LASSERT(imp->imp_obd == obd);
3321
3322 switch (event) {
3323 case IMP_EVENT_DISCON: {
3324 cli = &obd->u.cli;
3325 client_obd_list_lock(&cli->cl_loi_list_lock);
3326 cli->cl_avail_grant = 0;
3327 cli->cl_lost_grant = 0;
3328 client_obd_list_unlock(&cli->cl_loi_list_lock);
3329 break;
3330 }
3331 case IMP_EVENT_INACTIVE: {
3332 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3333 break;
3334 }
3335 case IMP_EVENT_INVALIDATE: {
3336 struct ldlm_namespace *ns = obd->obd_namespace;
3337 struct lu_env *env;
3338 int refcheck;
3339
3340 env = cl_env_get(&refcheck);
3341 if (!IS_ERR(env)) {
3342
3343 cli = &obd->u.cli;
3344
3345
3346 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3347
3348 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3349 cl_env_put(env, &refcheck);
3350 } else
3351 rc = PTR_ERR(env);
3352 break;
3353 }
3354 case IMP_EVENT_ACTIVE: {
3355 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3356 break;
3357 }
3358 case IMP_EVENT_OCD: {
3359 struct obd_connect_data *ocd = &imp->imp_connect_data;
3360
3361 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3362 osc_init_grant(&obd->u.cli, ocd);
3363
3364
3365 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3366 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3367
3368 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3369 break;
3370 }
3371 case IMP_EVENT_DEACTIVATE: {
3372 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3373 break;
3374 }
3375 case IMP_EVENT_ACTIVATE: {
3376 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3377 break;
3378 }
3379 default:
3380 CERROR("Unknown import event %d\n", event);
3381 LBUG();
3382 }
3383 return rc;
3384}
3385
3386
3387
3388
3389
3390
3391
3392
3393static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3394{
3395 check_res_locked(lock->l_resource);
3396
3397
3398
3399
3400
3401
3402
3403 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3404 (lock->l_granted_mode == LCK_PR ||
3405 lock->l_granted_mode == LCK_CR) &&
3406 (osc_dlm_lock_pageref(lock) == 0))
3407 return 1;
3408
3409 return 0;
3410}
3411
3412static int brw_queue_work(const struct lu_env *env, void *data)
3413{
3414 struct client_obd *cli = data;
3415
3416 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3417
3418 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3419 return 0;
3420}
3421
3422int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3423{
3424 struct lprocfs_static_vars lvars = { NULL };
3425 struct client_obd *cli = &obd->u.cli;
3426 void *handler;
3427 int rc;
3428
3429 rc = ptlrpcd_addref();
3430 if (rc)
3431 return rc;
3432
3433 rc = client_obd_setup(obd, lcfg);
3434 if (rc)
3435 GOTO(out_ptlrpcd, rc);
3436
3437 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3438 if (IS_ERR(handler))
3439 GOTO(out_client_setup, rc = PTR_ERR(handler));
3440 cli->cl_writeback_work = handler;
3441
3442 rc = osc_quota_setup(obd);
3443 if (rc)
3444 GOTO(out_ptlrpcd_work, rc);
3445
3446 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3447 lprocfs_osc_init_vars(&lvars);
3448 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3449 lproc_osc_attach_seqstat(obd);
3450 sptlrpc_lprocfs_cliobd_attach(obd);
3451 ptlrpc_lprocfs_register_obd(obd);
3452 }
3453
3454
3455
3456
3457
3458
3459 cli->cl_import->imp_rq_pool =
3460 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3461 OST_MAXREQSIZE,
3462 ptlrpc_add_rqs_to_pool);
3463
3464 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3465 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3466 return rc;
3467
3468out_ptlrpcd_work:
3469 ptlrpcd_destroy_work(handler);
3470out_client_setup:
3471 client_obd_cleanup(obd);
3472out_ptlrpcd:
3473 ptlrpcd_decref();
3474 return rc;
3475}
3476
3477static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3478{
3479 int rc = 0;
3480
3481 switch (stage) {
3482 case OBD_CLEANUP_EARLY: {
3483 struct obd_import *imp;
3484 imp = obd->u.cli.cl_import;
3485 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3486
3487 ptlrpc_deactivate_import(imp);
3488 spin_lock(&imp->imp_lock);
3489 imp->imp_pingable = 0;
3490 spin_unlock(&imp->imp_lock);
3491 break;
3492 }
3493 case OBD_CLEANUP_EXPORTS: {
3494 struct client_obd *cli = &obd->u.cli;
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504 obd_zombie_barrier();
3505 if (cli->cl_writeback_work) {
3506 ptlrpcd_destroy_work(cli->cl_writeback_work);
3507 cli->cl_writeback_work = NULL;
3508 }
3509 obd_cleanup_client_import(obd);
3510 ptlrpc_lprocfs_unregister_obd(obd);
3511 lprocfs_obd_cleanup(obd);
3512 rc = obd_llog_finish(obd, 0);
3513 if (rc != 0)
3514 CERROR("failed to cleanup llogging subsystems\n");
3515 break;
3516 }
3517 }
3518 return rc;
3519}
3520
3521int osc_cleanup(struct obd_device *obd)
3522{
3523 struct client_obd *cli = &obd->u.cli;
3524 int rc;
3525
3526
3527 if (cli->cl_cache != NULL) {
3528 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3529 spin_lock(&cli->cl_cache->ccc_lru_lock);
3530 list_del_init(&cli->cl_lru_osc);
3531 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3532 cli->cl_lru_left = NULL;
3533 atomic_dec(&cli->cl_cache->ccc_users);
3534 cli->cl_cache = NULL;
3535 }
3536
3537
3538 osc_quota_cleanup(obd);
3539
3540 rc = client_obd_cleanup(obd);
3541
3542 ptlrpcd_decref();
3543 return rc;
3544}
3545
3546int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3547{
3548 struct lprocfs_static_vars lvars = { NULL };
3549 int rc = 0;
3550
3551 lprocfs_osc_init_vars(&lvars);
3552
3553 switch (lcfg->lcfg_command) {
3554 default:
3555 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3556 lcfg, obd);
3557 if (rc > 0)
3558 rc = 0;
3559 break;
3560 }
3561
3562 return(rc);
3563}
3564
3565static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3566{
3567 return osc_process_config_base(obd, buf);
3568}
3569
3570struct obd_ops osc_obd_ops = {
3571 .o_owner = THIS_MODULE,
3572 .o_setup = osc_setup,
3573 .o_precleanup = osc_precleanup,
3574 .o_cleanup = osc_cleanup,
3575 .o_add_conn = client_import_add_conn,
3576 .o_del_conn = client_import_del_conn,
3577 .o_connect = client_connect_import,
3578 .o_reconnect = osc_reconnect,
3579 .o_disconnect = osc_disconnect,
3580 .o_statfs = osc_statfs,
3581 .o_statfs_async = osc_statfs_async,
3582 .o_packmd = osc_packmd,
3583 .o_unpackmd = osc_unpackmd,
3584 .o_create = osc_create,
3585 .o_destroy = osc_destroy,
3586 .o_getattr = osc_getattr,
3587 .o_getattr_async = osc_getattr_async,
3588 .o_setattr = osc_setattr,
3589 .o_setattr_async = osc_setattr_async,
3590 .o_brw = osc_brw,
3591 .o_punch = osc_punch,
3592 .o_sync = osc_sync,
3593 .o_enqueue = osc_enqueue,
3594 .o_change_cbdata = osc_change_cbdata,
3595 .o_find_cbdata = osc_find_cbdata,
3596 .o_cancel = osc_cancel,
3597 .o_cancel_unused = osc_cancel_unused,
3598 .o_iocontrol = osc_iocontrol,
3599 .o_get_info = osc_get_info,
3600 .o_set_info_async = osc_set_info_async,
3601 .o_import_event = osc_import_event,
3602 .o_llog_init = osc_llog_init,
3603 .o_llog_finish = osc_llog_finish,
3604 .o_process_config = osc_process_config,
3605 .o_quotactl = osc_quotactl,
3606 .o_quotacheck = osc_quotacheck,
3607};
3608
3609extern struct lu_kmem_descr osc_caches[];
3610extern spinlock_t osc_ast_guard;
3611extern struct lock_class_key osc_ast_guard_class;
3612
3613int __init osc_init(void)
3614{
3615 struct lprocfs_static_vars lvars = { NULL };
3616 int rc;
3617
3618
3619
3620
3621 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3622
3623 rc = lu_kmem_init(osc_caches);
3624 if (rc)
3625 return rc;
3626
3627 lprocfs_osc_init_vars(&lvars);
3628
3629 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3630 LUSTRE_OSC_NAME, &osc_device_type);
3631 if (rc) {
3632 lu_kmem_fini(osc_caches);
3633 return rc;
3634 }
3635
3636 spin_lock_init(&osc_ast_guard);
3637 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3638
3639 return rc;
3640}
3641
3642static void osc_exit(void)
3643{
3644 class_unregister_type(LUSTRE_OSC_NAME);
3645 lu_kmem_fini(osc_caches);
3646}
3647
3648MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3649MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3650MODULE_LICENSE("GPL");
3651MODULE_VERSION(LUSTRE_VERSION_STRING);
3652
3653module_init(osc_init);
3654module_exit(osc_exit);
3655