linux/tools/lib/bpf/ringbuf.c
<<
>>
Prefs
   1// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
   2/*
   3 * Ring buffer operations.
   4 *
   5 * Copyright (C) 2020 Facebook, Inc.
   6 */
   7#ifndef _GNU_SOURCE
   8#define _GNU_SOURCE
   9#endif
  10#include <stdlib.h>
  11#include <stdio.h>
  12#include <errno.h>
  13#include <unistd.h>
  14#include <linux/err.h>
  15#include <linux/bpf.h>
  16#include <asm/barrier.h>
  17#include <sys/mman.h>
  18#include <sys/epoll.h>
  19
  20#include "libbpf.h"
  21#include "libbpf_internal.h"
  22#include "bpf.h"
  23
  24struct ring {
  25        ring_buffer_sample_fn sample_cb;
  26        void *ctx;
  27        void *data;
  28        unsigned long *consumer_pos;
  29        unsigned long *producer_pos;
  30        unsigned long mask;
  31        int map_fd;
  32};
  33
  34struct ring_buffer {
  35        struct epoll_event *events;
  36        struct ring *rings;
  37        size_t page_size;
  38        int epoll_fd;
  39        int ring_cnt;
  40};
  41
  42static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
  43{
  44        if (r->consumer_pos) {
  45                munmap(r->consumer_pos, rb->page_size);
  46                r->consumer_pos = NULL;
  47        }
  48        if (r->producer_pos) {
  49                munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
  50                r->producer_pos = NULL;
  51        }
  52}
  53
  54/* Add extra RINGBUF maps to this ring buffer manager */
  55int ring_buffer__add(struct ring_buffer *rb, int map_fd,
  56                     ring_buffer_sample_fn sample_cb, void *ctx)
  57{
  58        struct bpf_map_info info;
  59        __u32 len = sizeof(info);
  60        struct epoll_event *e;
  61        struct ring *r;
  62        void *tmp;
  63        int err;
  64
  65        memset(&info, 0, sizeof(info));
  66
  67        err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
  68        if (err) {
  69                err = -errno;
  70                pr_warn("ringbuf: failed to get map info for fd=%d: %d\n",
  71                        map_fd, err);
  72                return libbpf_err(err);
  73        }
  74
  75        if (info.type != BPF_MAP_TYPE_RINGBUF) {
  76                pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
  77                        map_fd);
  78                return libbpf_err(-EINVAL);
  79        }
  80
  81        tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
  82        if (!tmp)
  83                return libbpf_err(-ENOMEM);
  84        rb->rings = tmp;
  85
  86        tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
  87        if (!tmp)
  88                return libbpf_err(-ENOMEM);
  89        rb->events = tmp;
  90
  91        r = &rb->rings[rb->ring_cnt];
  92        memset(r, 0, sizeof(*r));
  93
  94        r->map_fd = map_fd;
  95        r->sample_cb = sample_cb;
  96        r->ctx = ctx;
  97        r->mask = info.max_entries - 1;
  98
  99        /* Map writable consumer page */
 100        tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
 101                   map_fd, 0);
 102        if (tmp == MAP_FAILED) {
 103                err = -errno;
 104                pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
 105                        map_fd, err);
 106                return libbpf_err(err);
 107        }
 108        r->consumer_pos = tmp;
 109
 110        /* Map read-only producer page and data pages. We map twice as big
 111         * data size to allow simple reading of samples that wrap around the
 112         * end of a ring buffer. See kernel implementation for details.
 113         * */
 114        tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ,
 115                   MAP_SHARED, map_fd, rb->page_size);
 116        if (tmp == MAP_FAILED) {
 117                err = -errno;
 118                ringbuf_unmap_ring(rb, r);
 119                pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
 120                        map_fd, err);
 121                return libbpf_err(err);
 122        }
 123        r->producer_pos = tmp;
 124        r->data = tmp + rb->page_size;
 125
 126        e = &rb->events[rb->ring_cnt];
 127        memset(e, 0, sizeof(*e));
 128
 129        e->events = EPOLLIN;
 130        e->data.fd = rb->ring_cnt;
 131        if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
 132                err = -errno;
 133                ringbuf_unmap_ring(rb, r);
 134                pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
 135                        map_fd, err);
 136                return libbpf_err(err);
 137        }
 138
 139        rb->ring_cnt++;
 140        return 0;
 141}
 142
 143void ring_buffer__free(struct ring_buffer *rb)
 144{
 145        int i;
 146
 147        if (!rb)
 148                return;
 149
 150        for (i = 0; i < rb->ring_cnt; ++i)
 151                ringbuf_unmap_ring(rb, &rb->rings[i]);
 152        if (rb->epoll_fd >= 0)
 153                close(rb->epoll_fd);
 154
 155        free(rb->events);
 156        free(rb->rings);
 157        free(rb);
 158}
 159
 160struct ring_buffer *
 161ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
 162                 const struct ring_buffer_opts *opts)
 163{
 164        struct ring_buffer *rb;
 165        int err;
 166
 167        if (!OPTS_VALID(opts, ring_buffer_opts))
 168                return errno = EINVAL, NULL;
 169
 170        rb = calloc(1, sizeof(*rb));
 171        if (!rb)
 172                return errno = ENOMEM, NULL;
 173
 174        rb->page_size = getpagesize();
 175
 176        rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
 177        if (rb->epoll_fd < 0) {
 178                err = -errno;
 179                pr_warn("ringbuf: failed to create epoll instance: %d\n", err);
 180                goto err_out;
 181        }
 182
 183        err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
 184        if (err)
 185                goto err_out;
 186
 187        return rb;
 188
 189err_out:
 190        ring_buffer__free(rb);
 191        return errno = -err, NULL;
 192}
 193
 194static inline int roundup_len(__u32 len)
 195{
 196        /* clear out top 2 bits (discard and busy, if set) */
 197        len <<= 2;
 198        len >>= 2;
 199        /* add length prefix */
 200        len += BPF_RINGBUF_HDR_SZ;
 201        /* round up to 8 byte alignment */
 202        return (len + 7) / 8 * 8;
 203}
 204
 205static int64_t ringbuf_process_ring(struct ring* r)
 206{
 207        int *len_ptr, len, err;
 208        /* 64-bit to avoid overflow in case of extreme application behavior */
 209        int64_t cnt = 0;
 210        unsigned long cons_pos, prod_pos;
 211        bool got_new_data;
 212        void *sample;
 213
 214        cons_pos = smp_load_acquire(r->consumer_pos);
 215        do {
 216                got_new_data = false;
 217                prod_pos = smp_load_acquire(r->producer_pos);
 218                while (cons_pos < prod_pos) {
 219                        len_ptr = r->data + (cons_pos & r->mask);
 220                        len = smp_load_acquire(len_ptr);
 221
 222                        /* sample not committed yet, bail out for now */
 223                        if (len & BPF_RINGBUF_BUSY_BIT)
 224                                goto done;
 225
 226                        got_new_data = true;
 227                        cons_pos += roundup_len(len);
 228
 229                        if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
 230                                sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
 231                                err = r->sample_cb(r->ctx, sample, len);
 232                                if (err < 0) {
 233                                        /* update consumer pos and bail out */
 234                                        smp_store_release(r->consumer_pos,
 235                                                          cons_pos);
 236                                        return err;
 237                                }
 238                                cnt++;
 239                        }
 240
 241                        smp_store_release(r->consumer_pos, cons_pos);
 242                }
 243        } while (got_new_data);
 244done:
 245        return cnt;
 246}
 247
 248/* Consume available ring buffer(s) data without event polling.
 249 * Returns number of records consumed across all registered ring buffers (or
 250 * INT_MAX, whichever is less), or negative number if any of the callbacks
 251 * return error.
 252 */
 253int ring_buffer__consume(struct ring_buffer *rb)
 254{
 255        int64_t err, res = 0;
 256        int i;
 257
 258        for (i = 0; i < rb->ring_cnt; i++) {
 259                struct ring *ring = &rb->rings[i];
 260
 261                err = ringbuf_process_ring(ring);
 262                if (err < 0)
 263                        return libbpf_err(err);
 264                res += err;
 265        }
 266        if (res > INT_MAX)
 267                return INT_MAX;
 268        return res;
 269}
 270
 271/* Poll for available data and consume records, if any are available.
 272 * Returns number of records consumed (or INT_MAX, whichever is less), or
 273 * negative number, if any of the registered callbacks returned error.
 274 */
 275int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
 276{
 277        int i, cnt;
 278        int64_t err, res = 0;
 279
 280        cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
 281        if (cnt < 0)
 282                return libbpf_err(-errno);
 283
 284        for (i = 0; i < cnt; i++) {
 285                __u32 ring_id = rb->events[i].data.fd;
 286                struct ring *ring = &rb->rings[ring_id];
 287
 288                err = ringbuf_process_ring(ring);
 289                if (err < 0)
 290                        return libbpf_err(err);
 291                res += err;
 292        }
 293        if (res > INT_MAX)
 294                return INT_MAX;
 295        return res;
 296}
 297
 298/* Get an fd that can be used to sleep until data is available in the ring(s) */
 299int ring_buffer__epoll_fd(const struct ring_buffer *rb)
 300{
 301        return rb->epoll_fd;
 302}
 303