linux/tools/lib/bpf/ringbuf.c
<<
>>
Prefs
   1// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
   2/*
   3 * Ring buffer operations.
   4 *
   5 * Copyright (C) 2020 Facebook, Inc.
   6 */
   7#ifndef _GNU_SOURCE
   8#define _GNU_SOURCE
   9#endif
  10#include <stdlib.h>
  11#include <stdio.h>
  12#include <errno.h>
  13#include <unistd.h>
  14#include <linux/err.h>
  15#include <linux/bpf.h>
  16#include <asm/barrier.h>
  17#include <sys/mman.h>
  18#include <sys/epoll.h>
  19#include <tools/libc_compat.h>
  20
  21#include "libbpf.h"
  22#include "libbpf_internal.h"
  23#include "bpf.h"
  24
  25/* make sure libbpf doesn't use kernel-only integer typedefs */
  26#pragma GCC poison u8 u16 u32 u64 s8 s16 s32 s64
  27
  28struct ring {
  29        ring_buffer_sample_fn sample_cb;
  30        void *ctx;
  31        void *data;
  32        unsigned long *consumer_pos;
  33        unsigned long *producer_pos;
  34        unsigned long mask;
  35        int map_fd;
  36};
  37
  38struct ring_buffer {
  39        struct epoll_event *events;
  40        struct ring *rings;
  41        size_t page_size;
  42        int epoll_fd;
  43        int ring_cnt;
  44};
  45
  46static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
  47{
  48        if (r->consumer_pos) {
  49                munmap(r->consumer_pos, rb->page_size);
  50                r->consumer_pos = NULL;
  51        }
  52        if (r->producer_pos) {
  53                munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
  54                r->producer_pos = NULL;
  55        }
  56}
  57
  58/* Add extra RINGBUF maps to this ring buffer manager */
  59int ring_buffer__add(struct ring_buffer *rb, int map_fd,
  60                     ring_buffer_sample_fn sample_cb, void *ctx)
  61{
  62        struct bpf_map_info info;
  63        __u32 len = sizeof(info);
  64        struct epoll_event *e;
  65        struct ring *r;
  66        void *tmp;
  67        int err;
  68
  69        memset(&info, 0, sizeof(info));
  70
  71        err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
  72        if (err) {
  73                err = -errno;
  74                pr_warn("ringbuf: failed to get map info for fd=%d: %d\n",
  75                        map_fd, err);
  76                return err;
  77        }
  78
  79        if (info.type != BPF_MAP_TYPE_RINGBUF) {
  80                pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
  81                        map_fd);
  82                return -EINVAL;
  83        }
  84
  85        tmp = reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
  86        if (!tmp)
  87                return -ENOMEM;
  88        rb->rings = tmp;
  89
  90        tmp = reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
  91        if (!tmp)
  92                return -ENOMEM;
  93        rb->events = tmp;
  94
  95        r = &rb->rings[rb->ring_cnt];
  96        memset(r, 0, sizeof(*r));
  97
  98        r->map_fd = map_fd;
  99        r->sample_cb = sample_cb;
 100        r->ctx = ctx;
 101        r->mask = info.max_entries - 1;
 102
 103        /* Map writable consumer page */
 104        tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
 105                   map_fd, 0);
 106        if (tmp == MAP_FAILED) {
 107                err = -errno;
 108                pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
 109                        map_fd, err);
 110                return err;
 111        }
 112        r->consumer_pos = tmp;
 113
 114        /* Map read-only producer page and data pages. We map twice as big
 115         * data size to allow simple reading of samples that wrap around the
 116         * end of a ring buffer. See kernel implementation for details.
 117         * */
 118        tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ,
 119                   MAP_SHARED, map_fd, rb->page_size);
 120        if (tmp == MAP_FAILED) {
 121                err = -errno;
 122                ringbuf_unmap_ring(rb, r);
 123                pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
 124                        map_fd, err);
 125                return err;
 126        }
 127        r->producer_pos = tmp;
 128        r->data = tmp + rb->page_size;
 129
 130        e = &rb->events[rb->ring_cnt];
 131        memset(e, 0, sizeof(*e));
 132
 133        e->events = EPOLLIN;
 134        e->data.fd = rb->ring_cnt;
 135        if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
 136                err = -errno;
 137                ringbuf_unmap_ring(rb, r);
 138                pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
 139                        map_fd, err);
 140                return err;
 141        }
 142
 143        rb->ring_cnt++;
 144        return 0;
 145}
 146
 147void ring_buffer__free(struct ring_buffer *rb)
 148{
 149        int i;
 150
 151        if (!rb)
 152                return;
 153
 154        for (i = 0; i < rb->ring_cnt; ++i)
 155                ringbuf_unmap_ring(rb, &rb->rings[i]);
 156        if (rb->epoll_fd >= 0)
 157                close(rb->epoll_fd);
 158
 159        free(rb->events);
 160        free(rb->rings);
 161        free(rb);
 162}
 163
 164struct ring_buffer *
 165ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
 166                 const struct ring_buffer_opts *opts)
 167{
 168        struct ring_buffer *rb;
 169        int err;
 170
 171        if (!OPTS_VALID(opts, ring_buffer_opts))
 172                return NULL;
 173
 174        rb = calloc(1, sizeof(*rb));
 175        if (!rb)
 176                return NULL;
 177
 178        rb->page_size = getpagesize();
 179
 180        rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
 181        if (rb->epoll_fd < 0) {
 182                err = -errno;
 183                pr_warn("ringbuf: failed to create epoll instance: %d\n", err);
 184                goto err_out;
 185        }
 186
 187        err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
 188        if (err)
 189                goto err_out;
 190
 191        return rb;
 192
 193err_out:
 194        ring_buffer__free(rb);
 195        return NULL;
 196}
 197
 198static inline int roundup_len(__u32 len)
 199{
 200        /* clear out top 2 bits (discard and busy, if set) */
 201        len <<= 2;
 202        len >>= 2;
 203        /* add length prefix */
 204        len += BPF_RINGBUF_HDR_SZ;
 205        /* round up to 8 byte alignment */
 206        return (len + 7) / 8 * 8;
 207}
 208
 209static int ringbuf_process_ring(struct ring* r)
 210{
 211        int *len_ptr, len, err, cnt = 0;
 212        unsigned long cons_pos, prod_pos;
 213        bool got_new_data;
 214        void *sample;
 215
 216        cons_pos = smp_load_acquire(r->consumer_pos);
 217        do {
 218                got_new_data = false;
 219                prod_pos = smp_load_acquire(r->producer_pos);
 220                while (cons_pos < prod_pos) {
 221                        len_ptr = r->data + (cons_pos & r->mask);
 222                        len = smp_load_acquire(len_ptr);
 223
 224                        /* sample not committed yet, bail out for now */
 225                        if (len & BPF_RINGBUF_BUSY_BIT)
 226                                goto done;
 227
 228                        got_new_data = true;
 229                        cons_pos += roundup_len(len);
 230
 231                        if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
 232                                sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
 233                                err = r->sample_cb(r->ctx, sample, len);
 234                                if (err) {
 235                                        /* update consumer pos and bail out */
 236                                        smp_store_release(r->consumer_pos,
 237                                                          cons_pos);
 238                                        return err;
 239                                }
 240                                cnt++;
 241                        }
 242
 243                        smp_store_release(r->consumer_pos, cons_pos);
 244                }
 245        } while (got_new_data);
 246done:
 247        return cnt;
 248}
 249
 250/* Consume available ring buffer(s) data without event polling.
 251 * Returns number of records consumed across all registered ring buffers, or
 252 * negative number if any of the callbacks return error.
 253 */
 254int ring_buffer__consume(struct ring_buffer *rb)
 255{
 256        int i, err, res = 0;
 257
 258        for (i = 0; i < rb->ring_cnt; i++) {
 259                struct ring *ring = &rb->rings[i];
 260
 261                err = ringbuf_process_ring(ring);
 262                if (err < 0)
 263                        return err;
 264                res += err;
 265        }
 266        return res;
 267}
 268
 269/* Poll for available data and consume records, if any are available.
 270 * Returns number of records consumed, or negative number, if any of the
 271 * registered callbacks returned error.
 272 */
 273int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
 274{
 275        int i, cnt, err, res = 0;
 276
 277        cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
 278        for (i = 0; i < cnt; i++) {
 279                __u32 ring_id = rb->events[i].data.fd;
 280                struct ring *ring = &rb->rings[ring_id];
 281
 282                err = ringbuf_process_ring(ring);
 283                if (err < 0)
 284                        return err;
 285                res += cnt;
 286        }
 287        return cnt < 0 ? -errno : res;
 288}
 289