1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43#define DEBUG_SUBSYSTEM S_LOV
44#include <linux/libcfs/libcfs.h>
45
46#include <obd_support.h>
47#include <lustre_lib.h>
48#include <lustre_net.h>
49#include <lustre/lustre_idl.h>
50#include <lustre_dlm.h>
51#include <lustre_mds.h>
52#include <obd_class.h>
53#include <obd_lov.h>
54#include <obd_ost.h>
55#include <lprocfs_status.h>
56#include <lustre_log.h>
57
58#include "lov_internal.h"
59
60
61
62
63
64
65static int lov_llog_origin_add(const struct lu_env *env,
66 struct llog_ctxt *ctxt,
67 struct llog_rec_hdr *rec,
68 struct lov_stripe_md *lsm,
69 struct llog_cookie *logcookies, int numcookies)
70{
71 struct obd_device *obd = ctxt->loc_obd;
72 struct lov_obd *lov = &obd->u.lov;
73 int i, rc = 0, cookies = 0;
74 ENTRY;
75
76 LASSERTF(logcookies && numcookies >= lsm->lsm_stripe_count,
77 "logcookies %p, numcookies %d lsm->lsm_stripe_count %d \n",
78 logcookies, numcookies, lsm->lsm_stripe_count);
79
80 for (i = 0; i < lsm->lsm_stripe_count; i++) {
81 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
82 struct obd_device *child =
83 lov->lov_tgts[loi->loi_ost_idx]->ltd_exp->exp_obd;
84 struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
85
86
87 switch (rec->lrh_type) {
88 case MDS_UNLINK_REC: {
89 struct llog_unlink_rec *lur = (struct llog_unlink_rec *)rec;
90 lur->lur_oid = ostid_id(&loi->loi_oi);
91 lur->lur_oseq = (__u32)ostid_seq(&loi->loi_oi);
92 break;
93 }
94 case MDS_SETATTR64_REC: {
95 struct llog_setattr64_rec *lsr = (struct llog_setattr64_rec *)rec;
96 lsr->lsr_oi = loi->loi_oi;
97 break;
98 }
99 default:
100 break;
101 }
102
103
104 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FAIL_LOV_LOG_ADD)) {
105 llog_ctxt_put(cctxt);
106 cctxt = NULL;
107 }
108 rc = llog_obd_add(env, cctxt, rec, NULL, logcookies + cookies,
109 numcookies - cookies);
110 llog_ctxt_put(cctxt);
111 if (rc < 0) {
112 CERROR("Can't add llog (rc = %d) for stripe %d\n",
113 rc, cookies);
114 memset(logcookies + cookies, 0,
115 sizeof(struct llog_cookie));
116 rc = 1;
117 }
118
119 cookies += rc;
120 }
121 RETURN(cookies);
122}
123
124static int lov_llog_origin_connect(struct llog_ctxt *ctxt,
125 struct llog_logid *logid,
126 struct llog_gen *gen,
127 struct obd_uuid *uuid)
128{
129 struct obd_device *obd = ctxt->loc_obd;
130 struct lov_obd *lov = &obd->u.lov;
131 int i, rc = 0, err = 0;
132 ENTRY;
133
134 obd_getref(obd);
135 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
136 struct obd_device *child;
137 struct llog_ctxt *cctxt;
138
139 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
140 continue;
141 if (uuid && !obd_uuid_equals(uuid, &lov->lov_tgts[i]->ltd_uuid))
142 continue;
143 CDEBUG(D_CONFIG, "connect %d/%d\n", i, lov->desc.ld_tgt_count);
144 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
145 cctxt = llog_get_context(child, ctxt->loc_idx);
146 rc = llog_connect(cctxt, logid, gen, uuid);
147 llog_ctxt_put(cctxt);
148
149 if (rc) {
150 CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
151 if (!err)
152 err = rc;
153 }
154 }
155 obd_putref(obd);
156
157 RETURN(err);
158}
159
160
161static int lov_llog_repl_cancel(const struct lu_env *env,
162 struct llog_ctxt *ctxt,
163 struct lov_stripe_md *lsm,
164 int count, struct llog_cookie *cookies,
165 int flags)
166{
167 struct lov_obd *lov;
168 struct obd_device *obd = ctxt->loc_obd;
169 int rc = 0, i;
170 ENTRY;
171
172 LASSERT(lsm != NULL);
173 LASSERT(count == lsm->lsm_stripe_count);
174
175 lov = &obd->u.lov;
176 obd_getref(obd);
177 for (i = 0; i < count; i++, cookies++) {
178 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
179 struct obd_device *child =
180 lov->lov_tgts[loi->loi_ost_idx]->ltd_exp->exp_obd;
181 struct llog_ctxt *cctxt =
182 llog_get_context(child, ctxt->loc_idx);
183 int err;
184
185 err = llog_cancel(env, cctxt, NULL, 1, cookies, flags);
186 llog_ctxt_put(cctxt);
187 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
188 CERROR("%s: objid "DOSTID" subobj "DOSTID
189 " on OST idx %d: rc = %d\n",
190 obd->obd_name, POSTID(&lsm->lsm_oi),
191 POSTID(&loi->loi_oi), loi->loi_ost_idx, err);
192 if (!rc)
193 rc = err;
194 }
195 }
196 obd_putref(obd);
197 RETURN(rc);
198}
199
200static struct llog_operations lov_mds_ost_orig_logops = {
201 .lop_obd_add = lov_llog_origin_add,
202 .lop_connect = lov_llog_origin_connect,
203};
204
205static struct llog_operations lov_size_repl_logops = {
206 .lop_cancel = lov_llog_repl_cancel,
207};
208
209int lov_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
210 struct obd_device *disk_obd, int *index)
211{
212 struct lov_obd *lov = &obd->u.lov;
213 struct obd_device *child;
214 int i, rc = 0;
215 ENTRY;
216
217 LASSERT(olg == &obd->obd_olg);
218 rc = llog_setup(NULL, obd, olg, LLOG_MDS_OST_ORIG_CTXT, disk_obd,
219 &lov_mds_ost_orig_logops);
220 if (rc)
221 RETURN(rc);
222
223 rc = llog_setup(NULL, obd, olg, LLOG_SIZE_REPL_CTXT, disk_obd,
224 &lov_size_repl_logops);
225 if (rc)
226 GOTO(err_cleanup, rc);
227
228 obd_getref(obd);
229
230 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
231 if (!lov->lov_tgts[i])
232 continue;
233
234 if (index && i != *index)
235 continue;
236
237 child = lov->lov_tgts[i]->ltd_obd;
238 rc = obd_llog_init(child, &child->obd_olg, disk_obd, &i);
239 if (rc)
240 CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
241 "(rc=%d)\n", i, child->obd_name,
242 disk_obd->obd_name, rc);
243 rc = 0;
244 }
245 obd_putref(obd);
246 GOTO(err_cleanup, rc);
247err_cleanup:
248 if (rc) {
249 struct llog_ctxt *ctxt =
250 llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
251 if (ctxt)
252 llog_cleanup(NULL, ctxt);
253 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
254 if (ctxt)
255 llog_cleanup(NULL, ctxt);
256 }
257 return rc;
258}
259
260int lov_llog_finish(struct obd_device *obd, int count)
261{
262 struct llog_ctxt *ctxt;
263
264 ENTRY;
265
266
267
268 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
269 if (ctxt)
270 llog_cleanup(NULL, ctxt);
271
272 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
273 if (ctxt)
274 llog_cleanup(NULL, ctxt);
275
276
277 RETURN(0);
278}
279