1
2
3
4
5#include <stdio.h>
6#include <sys/queue.h>
7#include <string.h>
8#include <rte_mbuf.h>
9#include <rte_memory.h>
10#include <rte_memzone.h>
11#include <rte_errno.h>
12#include <rte_string_fns.h>
13#include <rte_eal_memconfig.h>
14#include <rte_pause.h>
15#include <rte_tailq.h>
16
17#include "rte_distributor_single.h"
18#include "distributor_private.h"
19
20TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
21
22static struct rte_tailq_elem rte_distributor_tailq = {
23 .name = "RTE_DISTRIBUTOR",
24};
25EAL_REGISTER_TAILQ(rte_distributor_tailq)
26
27
28
29void
30rte_distributor_request_pkt_single(struct rte_distributor_single *d,
31 unsigned worker_id, struct rte_mbuf *oldpkt)
32{
33 union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
34 int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
35 | RTE_DISTRIB_GET_BUF;
36 RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
37 ==, 0, __ATOMIC_RELAXED);
38
39
40 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
41}
42
43struct rte_mbuf *
44rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
45 unsigned worker_id)
46{
47 union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
48
49 if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE)
50 & RTE_DISTRIB_GET_BUF)
51 return NULL;
52
53
54 int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
55 return (struct rte_mbuf *)((uintptr_t)ret);
56}
57
58struct rte_mbuf *
59rte_distributor_get_pkt_single(struct rte_distributor_single *d,
60 unsigned worker_id, struct rte_mbuf *oldpkt)
61{
62 struct rte_mbuf *ret;
63 rte_distributor_request_pkt_single(d, worker_id, oldpkt);
64 while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
65 rte_pause();
66 return ret;
67}
68
69int
70rte_distributor_return_pkt_single(struct rte_distributor_single *d,
71 unsigned worker_id, struct rte_mbuf *oldpkt)
72{
73 union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
74 uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
75 | RTE_DISTRIB_RETURN_BUF;
76 RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
77 ==, 0, __ATOMIC_RELAXED);
78
79
80 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
81 return 0;
82}
83
84
85
86
87static int
88add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
89{
90 if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
91 return -1;
92
93 bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
94 = item;
95 return 0;
96}
97
98
99static int64_t
100backlog_pop(struct rte_distributor_backlog *bl)
101{
102 bl->count--;
103 return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
104}
105
106
107static inline void
108store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
109 unsigned *ret_start, unsigned *ret_count)
110{
111
112 d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
113 = (void *)oldbuf;
114 *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
115 *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
116}
117
118static inline void
119handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
120{
121 d->in_flight_tags[wkr] = 0;
122 d->in_flight_bitmask &= ~(1UL << wkr);
123
124 __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
125 if (unlikely(d->backlog[wkr].count != 0)) {
126
127
128
129
130
131
132
133
134
135
136 struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
137 unsigned i;
138 struct rte_distributor_backlog *bl = &d->backlog[wkr];
139
140 for (i = 0; i < bl->count; i++) {
141 unsigned idx = (bl->start + i) &
142 RTE_DISTRIB_BACKLOG_MASK;
143 pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
144 RTE_DISTRIB_FLAG_BITS));
145 }
146
147
148
149
150 rte_distributor_process_single(d, pkts, i);
151 bl->count = bl->start = 0;
152 }
153}
154
155
156
157
158
159static int
160process_returns(struct rte_distributor_single *d)
161{
162 unsigned wkr;
163 unsigned flushed = 0;
164 unsigned ret_start = d->returns.start,
165 ret_count = d->returns.count;
166
167 for (wkr = 0; wkr < d->num_workers; wkr++) {
168 uintptr_t oldbuf = 0;
169
170 const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
171 __ATOMIC_ACQUIRE);
172
173 if (data & RTE_DISTRIB_GET_BUF) {
174 flushed++;
175 if (d->backlog[wkr].count)
176
177 __atomic_store_n(&(d->bufs[wkr].bufptr64),
178 backlog_pop(&d->backlog[wkr]),
179 __ATOMIC_RELEASE);
180 else {
181
182 __atomic_store_n(&(d->bufs[wkr].bufptr64),
183 RTE_DISTRIB_GET_BUF,
184 __ATOMIC_RELEASE);
185 d->in_flight_tags[wkr] = 0;
186 d->in_flight_bitmask &= ~(1UL << wkr);
187 }
188 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
189 } else if (data & RTE_DISTRIB_RETURN_BUF) {
190 handle_worker_shutdown(d, wkr);
191 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
192 }
193
194 store_return(oldbuf, d, &ret_start, &ret_count);
195 }
196
197 d->returns.start = ret_start;
198 d->returns.count = ret_count;
199
200 return flushed;
201}
202
203
204int
205rte_distributor_process_single(struct rte_distributor_single *d,
206 struct rte_mbuf **mbufs, unsigned num_mbufs)
207{
208 unsigned next_idx = 0;
209 unsigned wkr = 0;
210 struct rte_mbuf *next_mb = NULL;
211 int64_t next_value = 0;
212 uint32_t new_tag = 0;
213 unsigned ret_start = d->returns.start,
214 ret_count = d->returns.count;
215
216 if (unlikely(num_mbufs == 0))
217 return process_returns(d);
218
219 while (next_idx < num_mbufs || next_mb != NULL) {
220 uintptr_t oldbuf = 0;
221
222 int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
223 __ATOMIC_ACQUIRE);
224
225 if (!next_mb) {
226 next_mb = mbufs[next_idx++];
227 next_value = (((int64_t)(uintptr_t)next_mb)
228 << RTE_DISTRIB_FLAG_BITS);
229
230
231
232
233
234
235 new_tag = next_mb->hash.usr;
236
237
238
239
240
241 uint64_t match = 0;
242 unsigned i;
243
244
245
246
247
248
249 for (i = 0; i < d->num_workers; i++)
250 match |= (!(d->in_flight_tags[i] ^ new_tag)
251 << i);
252
253
254 match &= d->in_flight_bitmask;
255
256 if (match) {
257 next_mb = NULL;
258 unsigned worker = __builtin_ctzl(match);
259 if (add_to_backlog(&d->backlog[worker],
260 next_value) < 0)
261 next_idx--;
262 }
263 }
264
265 if ((data & RTE_DISTRIB_GET_BUF) &&
266 (d->backlog[wkr].count || next_mb)) {
267
268 if (d->backlog[wkr].count)
269
270 __atomic_store_n(&(d->bufs[wkr].bufptr64),
271 backlog_pop(&d->backlog[wkr]),
272 __ATOMIC_RELEASE);
273
274 else {
275
276 __atomic_store_n(&(d->bufs[wkr].bufptr64),
277 next_value,
278 __ATOMIC_RELEASE);
279 d->in_flight_tags[wkr] = new_tag;
280 d->in_flight_bitmask |= (1UL << wkr);
281 next_mb = NULL;
282 }
283 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
284 } else if (data & RTE_DISTRIB_RETURN_BUF) {
285 handle_worker_shutdown(d, wkr);
286 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
287 }
288
289
290 store_return(oldbuf, d, &ret_start, &ret_count);
291
292 if (++wkr == d->num_workers)
293 wkr = 0;
294 }
295
296
297 for (wkr = 0; wkr < d->num_workers; wkr++)
298 if (d->backlog[wkr].count &&
299
300 (__atomic_load_n(&(d->bufs[wkr].bufptr64),
301 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
302
303 int64_t oldbuf = d->bufs[wkr].bufptr64 >>
304 RTE_DISTRIB_FLAG_BITS;
305
306 store_return(oldbuf, d, &ret_start, &ret_count);
307
308
309 __atomic_store_n(&(d->bufs[wkr].bufptr64),
310 backlog_pop(&d->backlog[wkr]),
311 __ATOMIC_RELEASE);
312 }
313
314 d->returns.start = ret_start;
315 d->returns.count = ret_count;
316 return num_mbufs;
317}
318
319
320int
321rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
322 struct rte_mbuf **mbufs, unsigned max_mbufs)
323{
324 struct rte_distributor_returned_pkts *returns = &d->returns;
325 unsigned retval = (max_mbufs < returns->count) ?
326 max_mbufs : returns->count;
327 unsigned i;
328
329 for (i = 0; i < retval; i++) {
330 unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
331 mbufs[i] = returns->mbufs[idx];
332 }
333 returns->start += i;
334 returns->count -= i;
335
336 return retval;
337}
338
339
340
341
342static inline unsigned
343total_outstanding(const struct rte_distributor_single *d)
344{
345 unsigned wkr, total_outstanding;
346
347 total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
348
349 for (wkr = 0; wkr < d->num_workers; wkr++)
350 total_outstanding += d->backlog[wkr].count;
351
352 return total_outstanding;
353}
354
355
356
357int
358rte_distributor_flush_single(struct rte_distributor_single *d)
359{
360 const unsigned flushed = total_outstanding(d);
361
362 while (total_outstanding(d) > 0)
363 rte_distributor_process_single(d, NULL, 0);
364
365 return flushed;
366}
367
368
369void
370rte_distributor_clear_returns_single(struct rte_distributor_single *d)
371{
372 d->returns.start = d->returns.count = 0;
373#ifndef __OPTIMIZE__
374 memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
375#endif
376}
377
378
379struct rte_distributor_single *
380rte_distributor_create_single(const char *name,
381 unsigned socket_id,
382 unsigned num_workers)
383{
384 struct rte_distributor_single *d;
385 struct rte_distributor_list *distributor_list;
386 char mz_name[RTE_MEMZONE_NAMESIZE];
387 const struct rte_memzone *mz;
388
389
390 RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
391 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
392 RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
393 sizeof(d->in_flight_bitmask) * CHAR_BIT);
394
395 if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
396 rte_errno = EINVAL;
397 return NULL;
398 }
399
400 snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
401 mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
402 if (mz == NULL) {
403 rte_errno = ENOMEM;
404 return NULL;
405 }
406
407 d = mz->addr;
408 strlcpy(d->name, name, sizeof(d->name));
409 d->num_workers = num_workers;
410
411 distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
412 rte_distributor_list);
413
414 rte_mcfg_tailq_write_lock();
415 TAILQ_INSERT_TAIL(distributor_list, d, next);
416 rte_mcfg_tailq_write_unlock();
417
418 return d;
419}
420