1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_RPC
38
39#include "../../include/linux/libcfs/libcfs.h"
40# ifdef __mips64__
41# include <linux/kernel.h>
42# endif
43
44#include "../include/obd_class.h"
45#include "../include/lustre_net.h"
46#include "../include/lustre_sec.h"
47#include "ptlrpc_internal.h"
48
49lnet_handle_eq_t ptlrpc_eq_h;
50
51
52
53
54void request_out_callback(lnet_event_t *ev)
55{
56 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
57 struct ptlrpc_request *req = cbid->cbid_arg;
58
59 LASSERT(ev->type == LNET_EVENT_SEND ||
60 ev->type == LNET_EVENT_UNLINK);
61 LASSERT(ev->unlinked);
62
63 DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
64
65 sptlrpc_request_out_callback(req);
66 spin_lock(&req->rq_lock);
67 req->rq_real_sent = ktime_get_real_seconds();
68 if (ev->unlinked)
69 req->rq_req_unlink = 0;
70
71 if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
72
73
74
75
76
77 req->rq_net_err = 1;
78 ptlrpc_client_wake_req(req);
79 }
80 spin_unlock(&req->rq_lock);
81
82 ptlrpc_req_finished(req);
83}
84
85
86
87
88void reply_in_callback(lnet_event_t *ev)
89{
90 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
91 struct ptlrpc_request *req = cbid->cbid_arg;
92
93 DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
94
95 LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
96 LASSERT(ev->md.start == req->rq_repbuf);
97 LASSERT(ev->offset + ev->mlength <= req->rq_repbuf_len);
98
99
100
101 LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
102
103 spin_lock(&req->rq_lock);
104
105 req->rq_receiving_reply = 0;
106 req->rq_early = 0;
107 if (ev->unlinked)
108 req->rq_reply_unlink = 0;
109
110 if (ev->status)
111 goto out_wake;
112
113 if (ev->type == LNET_EVENT_UNLINK) {
114 LASSERT(ev->unlinked);
115 DEBUG_REQ(D_NET, req, "unlink");
116 goto out_wake;
117 }
118
119 if (ev->mlength < ev->rlength) {
120 CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
121 req->rq_replen, ev->rlength, ev->offset);
122 req->rq_reply_truncate = 1;
123 req->rq_replied = 1;
124 req->rq_status = -EOVERFLOW;
125 req->rq_nob_received = ev->rlength + ev->offset;
126 goto out_wake;
127 }
128
129 if ((ev->offset == 0) &&
130 ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
131
132 DEBUG_REQ(D_ADAPTTO, req,
133 "Early reply received: mlen=%u offset=%d replen=%d replied=%d unlinked=%d",
134 ev->mlength, ev->offset,
135 req->rq_replen, req->rq_replied, ev->unlinked);
136
137 req->rq_early_count++;
138
139 if (req->rq_replied)
140 goto out_wake;
141
142 req->rq_early = 1;
143 req->rq_reply_off = ev->offset;
144 req->rq_nob_received = ev->mlength;
145
146 req->rq_receiving_reply = 1;
147 } else {
148
149 req->rq_rep_swab_mask = 0;
150 req->rq_replied = 1;
151
152 req->rq_resend = 0;
153 req->rq_reply_off = ev->offset;
154 req->rq_nob_received = ev->mlength;
155
156
157
158 DEBUG_REQ(D_INFO, req,
159 "reply in flags=%x mlen=%u offset=%d replen=%d",
160 lustre_msg_get_flags(req->rq_reqmsg),
161 ev->mlength, ev->offset, req->rq_replen);
162 }
163
164 req->rq_import->imp_last_reply_time = ktime_get_real_seconds();
165
166out_wake:
167
168
169
170 ptlrpc_client_wake_req(req);
171 spin_unlock(&req->rq_lock);
172}
173
174
175
176
177void client_bulk_callback(lnet_event_t *ev)
178{
179 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
180 struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
181 struct ptlrpc_request *req;
182
183 LASSERT((desc->bd_type == BULK_PUT_SINK &&
184 ev->type == LNET_EVENT_PUT) ||
185 (desc->bd_type == BULK_GET_SOURCE &&
186 ev->type == LNET_EVENT_GET) ||
187 ev->type == LNET_EVENT_UNLINK);
188 LASSERT(ev->unlinked);
189
190 if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
191 ev->status = -EIO;
192
193 if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,
194 CFS_FAIL_ONCE))
195 ev->status = -EIO;
196
197 CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
198 "event type %d, status %d, desc %p\n",
199 ev->type, ev->status, desc);
200
201 spin_lock(&desc->bd_lock);
202 req = desc->bd_req;
203 LASSERT(desc->bd_md_count > 0);
204 desc->bd_md_count--;
205
206 if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
207 desc->bd_nob_transferred += ev->mlength;
208 desc->bd_sender = ev->sender;
209 } else {
210
211 spin_lock(&req->rq_lock);
212 req->rq_net_err = 1;
213 spin_unlock(&req->rq_lock);
214 }
215
216 if (ev->status != 0)
217 desc->bd_failure = 1;
218
219
220
221
222 if (desc->bd_md_count == 0)
223 ptlrpc_client_wake_req(desc->bd_req);
224
225 spin_unlock(&desc->bd_lock);
226}
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits)
245
246#define REQS_SEC_SHIFT 32
247#define REQS_USEC_SHIFT 16
248#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt)
249
250static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
251 struct ptlrpc_request *req)
252{
253 __u64 sec = req->rq_arrival_time.tv_sec;
254 __u32 usec = req->rq_arrival_time.tv_nsec / NSEC_PER_USEC / 16;
255 __u64 new_seq;
256
257
258
259
260
261 new_seq = (sec << REQS_SEC_SHIFT) |
262 (usec << REQS_USEC_SHIFT) |
263 (svcpt->scp_cpt < 0 ? 0 : svcpt->scp_cpt);
264
265 if (new_seq > svcpt->scp_hist_seq) {
266
267
268
269 svcpt->scp_hist_seq = new_seq;
270 } else {
271 LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT);
272
273
274
275
276
277
278 svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt));
279 new_seq = svcpt->scp_hist_seq;
280 }
281
282 req->rq_history_seq = new_seq;
283
284 list_add_tail(&req->rq_history_list, &svcpt->scp_hist_reqs);
285}
286
287
288
289
290void request_in_callback(lnet_event_t *ev)
291{
292 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
293 struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
294 struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
295 struct ptlrpc_service *service = svcpt->scp_service;
296 struct ptlrpc_request *req;
297
298 LASSERT(ev->type == LNET_EVENT_PUT ||
299 ev->type == LNET_EVENT_UNLINK);
300 LASSERT((char *)ev->md.start >= rqbd->rqbd_buffer);
301 LASSERT((char *)ev->md.start + ev->offset + ev->mlength <=
302 rqbd->rqbd_buffer + service->srv_buf_size);
303
304 CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
305 "event type %d, status %d, service %s\n",
306 ev->type, ev->status, service->srv_name);
307
308 if (ev->unlinked) {
309
310
311
312
313
314
315 req = &rqbd->rqbd_req;
316 memset(req, 0, sizeof(*req));
317 } else {
318 LASSERT(ev->type == LNET_EVENT_PUT);
319 if (ev->status != 0) {
320
321 return;
322 }
323 req = ptlrpc_request_cache_alloc(GFP_ATOMIC);
324 if (!req) {
325 CERROR("Can't allocate incoming request descriptor: Dropping %s RPC from %s\n",
326 service->srv_name,
327 libcfs_id2str(ev->initiator));
328 return;
329 }
330 }
331
332
333
334
335
336 req->rq_xid = ev->match_bits;
337 req->rq_reqbuf = ev->md.start + ev->offset;
338 if (ev->type == LNET_EVENT_PUT && ev->status == 0)
339 req->rq_reqdata_len = ev->mlength;
340 ktime_get_real_ts64(&req->rq_arrival_time);
341 req->rq_peer = ev->initiator;
342 req->rq_self = ev->target.nid;
343 req->rq_rqbd = rqbd;
344 req->rq_phase = RQ_PHASE_NEW;
345 spin_lock_init(&req->rq_lock);
346 INIT_LIST_HEAD(&req->rq_timed_list);
347 INIT_LIST_HEAD(&req->rq_exp_list);
348 atomic_set(&req->rq_refcount, 1);
349 if (ev->type == LNET_EVENT_PUT)
350 CDEBUG(D_INFO, "incoming req@%p x%llu msgsize %u\n",
351 req, req->rq_xid, ev->mlength);
352
353 CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
354
355 spin_lock(&svcpt->scp_lock);
356
357 ptlrpc_req_add_history(svcpt, req);
358
359 if (ev->unlinked) {
360 svcpt->scp_nrqbds_posted--;
361 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
362 svcpt->scp_nrqbds_posted);
363
364
365
366
367 if (test_req_buffer_pressure &&
368 ev->type != LNET_EVENT_UNLINK &&
369 svcpt->scp_nrqbds_posted == 0)
370 CWARN("All %s request buffers busy\n",
371 service->srv_name);
372
373
374 } else {
375
376 rqbd->rqbd_refcount++;
377 }
378
379 list_add_tail(&req->rq_list, &svcpt->scp_req_incoming);
380 svcpt->scp_nreqs_incoming++;
381
382
383
384
385 wake_up(&svcpt->scp_waitq);
386
387 spin_unlock(&svcpt->scp_lock);
388}
389
390
391
392
393void reply_out_callback(lnet_event_t *ev)
394{
395 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
396 struct ptlrpc_reply_state *rs = cbid->cbid_arg;
397 struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
398
399 LASSERT(ev->type == LNET_EVENT_SEND ||
400 ev->type == LNET_EVENT_ACK ||
401 ev->type == LNET_EVENT_UNLINK);
402
403 if (!rs->rs_difficult) {
404
405
406
407 LASSERT(ev->unlinked);
408 ptlrpc_rs_decref(rs);
409 return;
410 }
411
412 LASSERT(rs->rs_on_net);
413
414 if (ev->unlinked) {
415
416
417
418 spin_lock(&svcpt->scp_rep_lock);
419 spin_lock(&rs->rs_lock);
420
421 rs->rs_on_net = 0;
422 if (!rs->rs_no_ack ||
423 rs->rs_transno <=
424 rs->rs_export->exp_obd->obd_last_committed)
425 ptlrpc_schedule_difficult_reply(rs);
426
427 spin_unlock(&rs->rs_lock);
428 spin_unlock(&svcpt->scp_rep_lock);
429 }
430}
431
432static void ptlrpc_master_callback(lnet_event_t *ev)
433{
434 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
435 void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
436
437
438 LASSERT(cbid->cbid_arg != LP_POISON);
439 LASSERT(callback == request_out_callback ||
440 callback == reply_in_callback ||
441 callback == client_bulk_callback ||
442 callback == request_in_callback ||
443 callback == reply_out_callback);
444
445 callback(ev);
446}
447
448int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
449 lnet_process_id_t *peer, lnet_nid_t *self)
450{
451 int best_dist = 0;
452 __u32 best_order = 0;
453 int count = 0;
454 int rc = -ENOENT;
455 int dist;
456 __u32 order;
457 lnet_nid_t dst_nid;
458 lnet_nid_t src_nid;
459
460 peer->pid = LNET_PID_LUSTRE;
461
462
463 while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
464 dist = LNetDist(dst_nid, &src_nid, &order);
465 if (dist < 0)
466 continue;
467
468 if (dist == 0) {
469 peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
470 rc = 0;
471 break;
472 }
473
474 if (rc < 0 ||
475 dist < best_dist ||
476 (dist == best_dist && order < best_order)) {
477 best_dist = dist;
478 best_order = order;
479
480 peer->nid = dst_nid;
481 *self = src_nid;
482 rc = 0;
483 }
484 }
485
486 CDEBUG(D_NET, "%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
487 return rc;
488}
489
490static void ptlrpc_ni_fini(void)
491{
492 wait_queue_head_t waitq;
493 struct l_wait_info lwi;
494 int rc;
495 int retries;
496
497
498
499
500
501
502
503 for (retries = 0;; retries++) {
504 rc = LNetEQFree(ptlrpc_eq_h);
505 switch (rc) {
506 default:
507 LBUG();
508
509 case 0:
510 LNetNIFini();
511 return;
512
513 case -EBUSY:
514 if (retries != 0)
515 CWARN("Event queue still busy\n");
516
517
518 init_waitqueue_head(&waitq);
519 lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
520 l_wait_event(waitq, 0, &lwi);
521 break;
522 }
523 }
524
525}
526
527static lnet_pid_t ptl_get_pid(void)
528{
529 lnet_pid_t pid;
530
531 pid = LNET_PID_LUSTRE;
532 return pid;
533}
534
535static int ptlrpc_ni_init(void)
536{
537 int rc;
538 lnet_pid_t pid;
539
540 pid = ptl_get_pid();
541 CDEBUG(D_NET, "My pid is: %x\n", pid);
542
543
544 rc = LNetNIInit(pid);
545 if (rc < 0) {
546 CDEBUG(D_NET, "Can't init network interface: %d\n", rc);
547 return -ENOENT;
548 }
549
550
551
552
553
554
555
556
557
558 rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
559 if (rc == 0)
560 return 0;
561
562 CERROR("Failed to allocate event queue: %d\n", rc);
563 LNetNIFini();
564
565 return -ENOMEM;
566}
567
568int ptlrpc_init_portals(void)
569{
570 int rc = ptlrpc_ni_init();
571
572 if (rc != 0) {
573 CERROR("network initialisation failed\n");
574 return -EIO;
575 }
576 rc = ptlrpcd_addref();
577 if (rc == 0)
578 return 0;
579
580 CERROR("rpcd initialisation failed\n");
581 ptlrpc_ni_fini();
582 return rc;
583}
584
585void ptlrpc_exit_portals(void)
586{
587 ptlrpcd_decref();
588 ptlrpc_ni_fini();
589}
590