linux/kernel/trace/ring_buffer.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Generic ring buffer
   4 *
   5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
   6 */
   7#include <linux/trace_events.h>
   8#include <linux/ring_buffer.h>
   9#include <linux/trace_clock.h>
  10#include <linux/sched/clock.h>
  11#include <linux/trace_seq.h>
  12#include <linux/spinlock.h>
  13#include <linux/irq_work.h>
  14#include <linux/security.h>
  15#include <linux/uaccess.h>
  16#include <linux/hardirq.h>
  17#include <linux/kthread.h>      /* for self test */
  18#include <linux/module.h>
  19#include <linux/percpu.h>
  20#include <linux/mutex.h>
  21#include <linux/delay.h>
  22#include <linux/slab.h>
  23#include <linux/init.h>
  24#include <linux/hash.h>
  25#include <linux/list.h>
  26#include <linux/cpu.h>
  27#include <linux/oom.h>
  28
  29#include <asm/local.h>
  30
  31static void update_pages_handler(struct work_struct *work);
  32
  33/*
  34 * The ring buffer header is special. We must manually up keep it.
  35 */
  36int ring_buffer_print_entry_header(struct trace_seq *s)
  37{
  38        trace_seq_puts(s, "# compressed entry header\n");
  39        trace_seq_puts(s, "\ttype_len    :    5 bits\n");
  40        trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
  41        trace_seq_puts(s, "\tarray       :   32 bits\n");
  42        trace_seq_putc(s, '\n');
  43        trace_seq_printf(s, "\tpadding     : type == %d\n",
  44                         RINGBUF_TYPE_PADDING);
  45        trace_seq_printf(s, "\ttime_extend : type == %d\n",
  46                         RINGBUF_TYPE_TIME_EXTEND);
  47        trace_seq_printf(s, "\ttime_stamp : type == %d\n",
  48                         RINGBUF_TYPE_TIME_STAMP);
  49        trace_seq_printf(s, "\tdata max type_len  == %d\n",
  50                         RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
  51
  52        return !trace_seq_has_overflowed(s);
  53}
  54
  55/*
  56 * The ring buffer is made up of a list of pages. A separate list of pages is
  57 * allocated for each CPU. A writer may only write to a buffer that is
  58 * associated with the CPU it is currently executing on.  A reader may read
  59 * from any per cpu buffer.
  60 *
  61 * The reader is special. For each per cpu buffer, the reader has its own
  62 * reader page. When a reader has read the entire reader page, this reader
  63 * page is swapped with another page in the ring buffer.
  64 *
  65 * Now, as long as the writer is off the reader page, the reader can do what
  66 * ever it wants with that page. The writer will never write to that page
  67 * again (as long as it is out of the ring buffer).
  68 *
  69 * Here's some silly ASCII art.
  70 *
  71 *   +------+
  72 *   |reader|          RING BUFFER
  73 *   |page  |
  74 *   +------+        +---+   +---+   +---+
  75 *                   |   |-->|   |-->|   |
  76 *                   +---+   +---+   +---+
  77 *                     ^               |
  78 *                     |               |
  79 *                     +---------------+
  80 *
  81 *
  82 *   +------+
  83 *   |reader|          RING BUFFER
  84 *   |page  |------------------v
  85 *   +------+        +---+   +---+   +---+
  86 *                   |   |-->|   |-->|   |
  87 *                   +---+   +---+   +---+
  88 *                     ^               |
  89 *                     |               |
  90 *                     +---------------+
  91 *
  92 *
  93 *   +------+
  94 *   |reader|          RING BUFFER
  95 *   |page  |------------------v
  96 *   +------+        +---+   +---+   +---+
  97 *      ^            |   |-->|   |-->|   |
  98 *      |            +---+   +---+   +---+
  99 *      |                              |
 100 *      |                              |
 101 *      +------------------------------+
 102 *
 103 *
 104 *   +------+
 105 *   |buffer|          RING BUFFER
 106 *   |page  |------------------v
 107 *   +------+        +---+   +---+   +---+
 108 *      ^            |   |   |   |-->|   |
 109 *      |   New      +---+   +---+   +---+
 110 *      |  Reader------^               |
 111 *      |   page                       |
 112 *      +------------------------------+
 113 *
 114 *
 115 * After we make this swap, the reader can hand this page off to the splice
 116 * code and be done with it. It can even allocate a new page if it needs to
 117 * and swap that into the ring buffer.
 118 *
 119 * We will be using cmpxchg soon to make all this lockless.
 120 *
 121 */
 122
 123/* Used for individual buffers (after the counter) */
 124#define RB_BUFFER_OFF           (1 << 20)
 125
 126#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
 127
 128#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
 129#define RB_ALIGNMENT            4U
 130#define RB_MAX_SMALL_DATA       (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
 131#define RB_EVNT_MIN_SIZE        8U      /* two 32bit words */
 132#define RB_ALIGN_DATA           __aligned(RB_ALIGNMENT)
 133
 134/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
 135#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
 136
 137enum {
 138        RB_LEN_TIME_EXTEND = 8,
 139        RB_LEN_TIME_STAMP =  8,
 140};
 141
 142#define skip_time_extend(event) \
 143        ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
 144
 145#define extended_time(event) \
 146        (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
 147
 148static inline int rb_null_event(struct ring_buffer_event *event)
 149{
 150        return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
 151}
 152
 153static void rb_event_set_padding(struct ring_buffer_event *event)
 154{
 155        /* padding has a NULL time_delta */
 156        event->type_len = RINGBUF_TYPE_PADDING;
 157        event->time_delta = 0;
 158}
 159
 160static unsigned
 161rb_event_data_length(struct ring_buffer_event *event)
 162{
 163        unsigned length;
 164
 165        if (event->type_len)
 166                length = event->type_len * RB_ALIGNMENT;
 167        else
 168                length = event->array[0];
 169        return length + RB_EVNT_HDR_SIZE;
 170}
 171
 172/*
 173 * Return the length of the given event. Will return
 174 * the length of the time extend if the event is a
 175 * time extend.
 176 */
 177static inline unsigned
 178rb_event_length(struct ring_buffer_event *event)
 179{
 180        switch (event->type_len) {
 181        case RINGBUF_TYPE_PADDING:
 182                if (rb_null_event(event))
 183                        /* undefined */
 184                        return -1;
 185                return  event->array[0] + RB_EVNT_HDR_SIZE;
 186
 187        case RINGBUF_TYPE_TIME_EXTEND:
 188                return RB_LEN_TIME_EXTEND;
 189
 190        case RINGBUF_TYPE_TIME_STAMP:
 191                return RB_LEN_TIME_STAMP;
 192
 193        case RINGBUF_TYPE_DATA:
 194                return rb_event_data_length(event);
 195        default:
 196                BUG();
 197        }
 198        /* not hit */
 199        return 0;
 200}
 201
 202/*
 203 * Return total length of time extend and data,
 204 *   or just the event length for all other events.
 205 */
 206static inline unsigned
 207rb_event_ts_length(struct ring_buffer_event *event)
 208{
 209        unsigned len = 0;
 210
 211        if (extended_time(event)) {
 212                /* time extends include the data event after it */
 213                len = RB_LEN_TIME_EXTEND;
 214                event = skip_time_extend(event);
 215        }
 216        return len + rb_event_length(event);
 217}
 218
 219/**
 220 * ring_buffer_event_length - return the length of the event
 221 * @event: the event to get the length of
 222 *
 223 * Returns the size of the data load of a data event.
 224 * If the event is something other than a data event, it
 225 * returns the size of the event itself. With the exception
 226 * of a TIME EXTEND, where it still returns the size of the
 227 * data load of the data event after it.
 228 */
 229unsigned ring_buffer_event_length(struct ring_buffer_event *event)
 230{
 231        unsigned length;
 232
 233        if (extended_time(event))
 234                event = skip_time_extend(event);
 235
 236        length = rb_event_length(event);
 237        if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
 238                return length;
 239        length -= RB_EVNT_HDR_SIZE;
 240        if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
 241                length -= sizeof(event->array[0]);
 242        return length;
 243}
 244EXPORT_SYMBOL_GPL(ring_buffer_event_length);
 245
 246/* inline for ring buffer fast paths */
 247static __always_inline void *
 248rb_event_data(struct ring_buffer_event *event)
 249{
 250        if (extended_time(event))
 251                event = skip_time_extend(event);
 252        BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
 253        /* If length is in len field, then array[0] has the data */
 254        if (event->type_len)
 255                return (void *)&event->array[0];
 256        /* Otherwise length is in array[0] and array[1] has the data */
 257        return (void *)&event->array[1];
 258}
 259
 260/**
 261 * ring_buffer_event_data - return the data of the event
 262 * @event: the event to get the data from
 263 */
 264void *ring_buffer_event_data(struct ring_buffer_event *event)
 265{
 266        return rb_event_data(event);
 267}
 268EXPORT_SYMBOL_GPL(ring_buffer_event_data);
 269
 270#define for_each_buffer_cpu(buffer, cpu)                \
 271        for_each_cpu(cpu, buffer->cpumask)
 272
 273#define TS_SHIFT        27
 274#define TS_MASK         ((1ULL << TS_SHIFT) - 1)
 275#define TS_DELTA_TEST   (~TS_MASK)
 276
 277/**
 278 * ring_buffer_event_time_stamp - return the event's extended timestamp
 279 * @event: the event to get the timestamp of
 280 *
 281 * Returns the extended timestamp associated with a data event.
 282 * An extended time_stamp is a 64-bit timestamp represented
 283 * internally in a special way that makes the best use of space
 284 * contained within a ring buffer event.  This function decodes
 285 * it and maps it to a straight u64 value.
 286 */
 287u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event)
 288{
 289        u64 ts;
 290
 291        ts = event->array[0];
 292        ts <<= TS_SHIFT;
 293        ts += event->time_delta;
 294
 295        return ts;
 296}
 297
 298/* Flag when events were overwritten */
 299#define RB_MISSED_EVENTS        (1 << 31)
 300/* Missed count stored at end */
 301#define RB_MISSED_STORED        (1 << 30)
 302
 303#define RB_MISSED_FLAGS         (RB_MISSED_EVENTS|RB_MISSED_STORED)
 304
 305struct buffer_data_page {
 306        u64              time_stamp;    /* page time stamp */
 307        local_t          commit;        /* write committed index */
 308        unsigned char    data[] RB_ALIGN_DATA;  /* data of buffer page */
 309};
 310
 311/*
 312 * Note, the buffer_page list must be first. The buffer pages
 313 * are allocated in cache lines, which means that each buffer
 314 * page will be at the beginning of a cache line, and thus
 315 * the least significant bits will be zero. We use this to
 316 * add flags in the list struct pointers, to make the ring buffer
 317 * lockless.
 318 */
 319struct buffer_page {
 320        struct list_head list;          /* list of buffer pages */
 321        local_t          write;         /* index for next write */
 322        unsigned         read;          /* index for next read */
 323        local_t          entries;       /* entries on this page */
 324        unsigned long    real_end;      /* real end of data */
 325        struct buffer_data_page *page;  /* Actual data page */
 326};
 327
 328/*
 329 * The buffer page counters, write and entries, must be reset
 330 * atomically when crossing page boundaries. To synchronize this
 331 * update, two counters are inserted into the number. One is
 332 * the actual counter for the write position or count on the page.
 333 *
 334 * The other is a counter of updaters. Before an update happens
 335 * the update partition of the counter is incremented. This will
 336 * allow the updater to update the counter atomically.
 337 *
 338 * The counter is 20 bits, and the state data is 12.
 339 */
 340#define RB_WRITE_MASK           0xfffff
 341#define RB_WRITE_INTCNT         (1 << 20)
 342
 343static void rb_init_page(struct buffer_data_page *bpage)
 344{
 345        local_set(&bpage->commit, 0);
 346}
 347
 348/*
 349 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
 350 * this issue out.
 351 */
 352static void free_buffer_page(struct buffer_page *bpage)
 353{
 354        free_page((unsigned long)bpage->page);
 355        kfree(bpage);
 356}
 357
 358/*
 359 * We need to fit the time_stamp delta into 27 bits.
 360 */
 361static inline int test_time_stamp(u64 delta)
 362{
 363        if (delta & TS_DELTA_TEST)
 364                return 1;
 365        return 0;
 366}
 367
 368#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
 369
 370/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
 371#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
 372
 373int ring_buffer_print_page_header(struct trace_seq *s)
 374{
 375        struct buffer_data_page field;
 376
 377        trace_seq_printf(s, "\tfield: u64 timestamp;\t"
 378                         "offset:0;\tsize:%u;\tsigned:%u;\n",
 379                         (unsigned int)sizeof(field.time_stamp),
 380                         (unsigned int)is_signed_type(u64));
 381
 382        trace_seq_printf(s, "\tfield: local_t commit;\t"
 383                         "offset:%u;\tsize:%u;\tsigned:%u;\n",
 384                         (unsigned int)offsetof(typeof(field), commit),
 385                         (unsigned int)sizeof(field.commit),
 386                         (unsigned int)is_signed_type(long));
 387
 388        trace_seq_printf(s, "\tfield: int overwrite;\t"
 389                         "offset:%u;\tsize:%u;\tsigned:%u;\n",
 390                         (unsigned int)offsetof(typeof(field), commit),
 391                         1,
 392                         (unsigned int)is_signed_type(long));
 393
 394        trace_seq_printf(s, "\tfield: char data;\t"
 395                         "offset:%u;\tsize:%u;\tsigned:%u;\n",
 396                         (unsigned int)offsetof(typeof(field), data),
 397                         (unsigned int)BUF_PAGE_SIZE,
 398                         (unsigned int)is_signed_type(char));
 399
 400        return !trace_seq_has_overflowed(s);
 401}
 402
 403struct rb_irq_work {
 404        struct irq_work                 work;
 405        wait_queue_head_t               waiters;
 406        wait_queue_head_t               full_waiters;
 407        bool                            waiters_pending;
 408        bool                            full_waiters_pending;
 409        bool                            wakeup_full;
 410};
 411
 412/*
 413 * Structure to hold event state and handle nested events.
 414 */
 415struct rb_event_info {
 416        u64                     ts;
 417        u64                     delta;
 418        unsigned long           length;
 419        struct buffer_page      *tail_page;
 420        int                     add_timestamp;
 421};
 422
 423/*
 424 * Used for which event context the event is in.
 425 *  NMI     = 0
 426 *  IRQ     = 1
 427 *  SOFTIRQ = 2
 428 *  NORMAL  = 3
 429 *
 430 * See trace_recursive_lock() comment below for more details.
 431 */
 432enum {
 433        RB_CTX_NMI,
 434        RB_CTX_IRQ,
 435        RB_CTX_SOFTIRQ,
 436        RB_CTX_NORMAL,
 437        RB_CTX_MAX
 438};
 439
 440/*
 441 * head_page == tail_page && head == tail then buffer is empty.
 442 */
 443struct ring_buffer_per_cpu {
 444        int                             cpu;
 445        atomic_t                        record_disabled;
 446        struct ring_buffer              *buffer;
 447        raw_spinlock_t                  reader_lock;    /* serialize readers */
 448        arch_spinlock_t                 lock;
 449        struct lock_class_key           lock_key;
 450        struct buffer_data_page         *free_page;
 451        unsigned long                   nr_pages;
 452        unsigned int                    current_context;
 453        struct list_head                *pages;
 454        struct buffer_page              *head_page;     /* read from head */
 455        struct buffer_page              *tail_page;     /* write to tail */
 456        struct buffer_page              *commit_page;   /* committed pages */
 457        struct buffer_page              *reader_page;
 458        unsigned long                   lost_events;
 459        unsigned long                   last_overrun;
 460        unsigned long                   nest;
 461        local_t                         entries_bytes;
 462        local_t                         entries;
 463        local_t                         overrun;
 464        local_t                         commit_overrun;
 465        local_t                         dropped_events;
 466        local_t                         committing;
 467        local_t                         commits;
 468        local_t                         pages_touched;
 469        local_t                         pages_read;
 470        long                            last_pages_touch;
 471        size_t                          shortest_full;
 472        unsigned long                   read;
 473        unsigned long                   read_bytes;
 474        u64                             write_stamp;
 475        u64                             read_stamp;
 476        /* ring buffer pages to update, > 0 to add, < 0 to remove */
 477        long                            nr_pages_to_update;
 478        struct list_head                new_pages; /* new pages to add */
 479        struct work_struct              update_pages_work;
 480        struct completion               update_done;
 481
 482        struct rb_irq_work              irq_work;
 483};
 484
 485struct ring_buffer {
 486        unsigned                        flags;
 487        int                             cpus;
 488        atomic_t                        record_disabled;
 489        atomic_t                        resize_disabled;
 490        cpumask_var_t                   cpumask;
 491
 492        struct lock_class_key           *reader_lock_key;
 493
 494        struct mutex                    mutex;
 495
 496        struct ring_buffer_per_cpu      **buffers;
 497
 498        struct hlist_node               node;
 499        u64                             (*clock)(void);
 500
 501        struct rb_irq_work              irq_work;
 502        bool                            time_stamp_abs;
 503};
 504
 505struct ring_buffer_iter {
 506        struct ring_buffer_per_cpu      *cpu_buffer;
 507        unsigned long                   head;
 508        struct buffer_page              *head_page;
 509        struct buffer_page              *cache_reader_page;
 510        unsigned long                   cache_read;
 511        u64                             read_stamp;
 512};
 513
 514/**
 515 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
 516 * @buffer: The ring_buffer to get the number of pages from
 517 * @cpu: The cpu of the ring_buffer to get the number of pages from
 518 *
 519 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
 520 */
 521size_t ring_buffer_nr_pages(struct ring_buffer *buffer, int cpu)
 522{
 523        return buffer->buffers[cpu]->nr_pages;
 524}
 525
 526/**
 527 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
 528 * @buffer: The ring_buffer to get the number of pages from
 529 * @cpu: The cpu of the ring_buffer to get the number of pages from
 530 *
 531 * Returns the number of pages that have content in the ring buffer.
 532 */
 533size_t ring_buffer_nr_dirty_pages(struct ring_buffer *buffer, int cpu)
 534{
 535        size_t read;
 536        size_t cnt;
 537
 538        read = local_read(&buffer->buffers[cpu]->pages_read);
 539        cnt = local_read(&buffer->buffers[cpu]->pages_touched);
 540        /* The reader can read an empty page, but not more than that */
 541        if (cnt < read) {
 542                WARN_ON_ONCE(read > cnt + 1);
 543                return 0;
 544        }
 545
 546        return cnt - read;
 547}
 548
 549/*
 550 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
 551 *
 552 * Schedules a delayed work to wake up any task that is blocked on the
 553 * ring buffer waiters queue.
 554 */
 555static void rb_wake_up_waiters(struct irq_work *work)
 556{
 557        struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
 558
 559        wake_up_all(&rbwork->waiters);
 560        if (rbwork->wakeup_full) {
 561                rbwork->wakeup_full = false;
 562                wake_up_all(&rbwork->full_waiters);
 563        }
 564}
 565
 566/**
 567 * ring_buffer_wait - wait for input to the ring buffer
 568 * @buffer: buffer to wait on
 569 * @cpu: the cpu buffer to wait on
 570 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
 571 *
 572 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 573 * as data is added to any of the @buffer's cpu buffers. Otherwise
 574 * it will wait for data to be added to a specific cpu buffer.
 575 */
 576int ring_buffer_wait(struct ring_buffer *buffer, int cpu, int full)
 577{
 578        struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer);
 579        DEFINE_WAIT(wait);
 580        struct rb_irq_work *work;
 581        int ret = 0;
 582
 583        /*
 584         * Depending on what the caller is waiting for, either any
 585         * data in any cpu buffer, or a specific buffer, put the
 586         * caller on the appropriate wait queue.
 587         */
 588        if (cpu == RING_BUFFER_ALL_CPUS) {
 589                work = &buffer->irq_work;
 590                /* Full only makes sense on per cpu reads */
 591                full = 0;
 592        } else {
 593                if (!cpumask_test_cpu(cpu, buffer->cpumask))
 594                        return -ENODEV;
 595                cpu_buffer = buffer->buffers[cpu];
 596                work = &cpu_buffer->irq_work;
 597        }
 598
 599
 600        while (true) {
 601                if (full)
 602                        prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
 603                else
 604                        prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
 605
 606                /*
 607                 * The events can happen in critical sections where
 608                 * checking a work queue can cause deadlocks.
 609                 * After adding a task to the queue, this flag is set
 610                 * only to notify events to try to wake up the queue
 611                 * using irq_work.
 612                 *
 613                 * We don't clear it even if the buffer is no longer
 614                 * empty. The flag only causes the next event to run
 615                 * irq_work to do the work queue wake up. The worse
 616                 * that can happen if we race with !trace_empty() is that
 617                 * an event will cause an irq_work to try to wake up
 618                 * an empty queue.
 619                 *
 620                 * There's no reason to protect this flag either, as
 621                 * the work queue and irq_work logic will do the necessary
 622                 * synchronization for the wake ups. The only thing
 623                 * that is necessary is that the wake up happens after
 624                 * a task has been queued. It's OK for spurious wake ups.
 625                 */
 626                if (full)
 627                        work->full_waiters_pending = true;
 628                else
 629                        work->waiters_pending = true;
 630
 631                if (signal_pending(current)) {
 632                        ret = -EINTR;
 633                        break;
 634                }
 635
 636                if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
 637                        break;
 638
 639                if (cpu != RING_BUFFER_ALL_CPUS &&
 640                    !ring_buffer_empty_cpu(buffer, cpu)) {
 641                        unsigned long flags;
 642                        bool pagebusy;
 643                        size_t nr_pages;
 644                        size_t dirty;
 645
 646                        if (!full)
 647                                break;
 648
 649                        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 650                        pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
 651                        nr_pages = cpu_buffer->nr_pages;
 652                        dirty = ring_buffer_nr_dirty_pages(buffer, cpu);
 653                        if (!cpu_buffer->shortest_full ||
 654                            cpu_buffer->shortest_full < full)
 655                                cpu_buffer->shortest_full = full;
 656                        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 657                        if (!pagebusy &&
 658                            (!nr_pages || (dirty * 100) > full * nr_pages))
 659                                break;
 660                }
 661
 662                schedule();
 663        }
 664
 665        if (full)
 666                finish_wait(&work->full_waiters, &wait);
 667        else
 668                finish_wait(&work->waiters, &wait);
 669
 670        return ret;
 671}
 672
 673/**
 674 * ring_buffer_poll_wait - poll on buffer input
 675 * @buffer: buffer to wait on
 676 * @cpu: the cpu buffer to wait on
 677 * @filp: the file descriptor
 678 * @poll_table: The poll descriptor
 679 *
 680 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 681 * as data is added to any of the @buffer's cpu buffers. Otherwise
 682 * it will wait for data to be added to a specific cpu buffer.
 683 *
 684 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
 685 * zero otherwise.
 686 */
 687__poll_t ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
 688                          struct file *filp, poll_table *poll_table)
 689{
 690        struct ring_buffer_per_cpu *cpu_buffer;
 691        struct rb_irq_work *work;
 692
 693        if (cpu == RING_BUFFER_ALL_CPUS)
 694                work = &buffer->irq_work;
 695        else {
 696                if (!cpumask_test_cpu(cpu, buffer->cpumask))
 697                        return -EINVAL;
 698
 699                cpu_buffer = buffer->buffers[cpu];
 700                work = &cpu_buffer->irq_work;
 701        }
 702
 703        poll_wait(filp, &work->waiters, poll_table);
 704        work->waiters_pending = true;
 705        /*
 706         * There's a tight race between setting the waiters_pending and
 707         * checking if the ring buffer is empty.  Once the waiters_pending bit
 708         * is set, the next event will wake the task up, but we can get stuck
 709         * if there's only a single event in.
 710         *
 711         * FIXME: Ideally, we need a memory barrier on the writer side as well,
 712         * but adding a memory barrier to all events will cause too much of a
 713         * performance hit in the fast path.  We only need a memory barrier when
 714         * the buffer goes from empty to having content.  But as this race is
 715         * extremely small, and it's not a problem if another event comes in, we
 716         * will fix it later.
 717         */
 718        smp_mb();
 719
 720        if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
 721            (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
 722                return EPOLLIN | EPOLLRDNORM;
 723        return 0;
 724}
 725
 726/* buffer may be either ring_buffer or ring_buffer_per_cpu */
 727#define RB_WARN_ON(b, cond)                                             \
 728        ({                                                              \
 729                int _____ret = unlikely(cond);                          \
 730                if (_____ret) {                                         \
 731                        if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
 732                                struct ring_buffer_per_cpu *__b =       \
 733                                        (void *)b;                      \
 734                                atomic_inc(&__b->buffer->record_disabled); \
 735                        } else                                          \
 736                                atomic_inc(&b->record_disabled);        \
 737                        WARN_ON(1);                                     \
 738                }                                                       \
 739                _____ret;                                               \
 740        })
 741
 742/* Up this if you want to test the TIME_EXTENTS and normalization */
 743#define DEBUG_SHIFT 0
 744
 745static inline u64 rb_time_stamp(struct ring_buffer *buffer)
 746{
 747        /* shift to debug/test normalization and TIME_EXTENTS */
 748        return buffer->clock() << DEBUG_SHIFT;
 749}
 750
 751u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
 752{
 753        u64 time;
 754
 755        preempt_disable_notrace();
 756        time = rb_time_stamp(buffer);
 757        preempt_enable_notrace();
 758
 759        return time;
 760}
 761EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
 762
 763void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
 764                                      int cpu, u64 *ts)
 765{
 766        /* Just stupid testing the normalize function and deltas */
 767        *ts >>= DEBUG_SHIFT;
 768}
 769EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
 770
 771/*
 772 * Making the ring buffer lockless makes things tricky.
 773 * Although writes only happen on the CPU that they are on,
 774 * and they only need to worry about interrupts. Reads can
 775 * happen on any CPU.
 776 *
 777 * The reader page is always off the ring buffer, but when the
 778 * reader finishes with a page, it needs to swap its page with
 779 * a new one from the buffer. The reader needs to take from
 780 * the head (writes go to the tail). But if a writer is in overwrite
 781 * mode and wraps, it must push the head page forward.
 782 *
 783 * Here lies the problem.
 784 *
 785 * The reader must be careful to replace only the head page, and
 786 * not another one. As described at the top of the file in the
 787 * ASCII art, the reader sets its old page to point to the next
 788 * page after head. It then sets the page after head to point to
 789 * the old reader page. But if the writer moves the head page
 790 * during this operation, the reader could end up with the tail.
 791 *
 792 * We use cmpxchg to help prevent this race. We also do something
 793 * special with the page before head. We set the LSB to 1.
 794 *
 795 * When the writer must push the page forward, it will clear the
 796 * bit that points to the head page, move the head, and then set
 797 * the bit that points to the new head page.
 798 *
 799 * We also don't want an interrupt coming in and moving the head
 800 * page on another writer. Thus we use the second LSB to catch
 801 * that too. Thus:
 802 *
 803 * head->list->prev->next        bit 1          bit 0
 804 *                              -------        -------
 805 * Normal page                     0              0
 806 * Points to head page             0              1
 807 * New head page                   1              0
 808 *
 809 * Note we can not trust the prev pointer of the head page, because:
 810 *
 811 * +----+       +-----+        +-----+
 812 * |    |------>|  T  |---X--->|  N  |
 813 * |    |<------|     |        |     |
 814 * +----+       +-----+        +-----+
 815 *   ^                           ^ |
 816 *   |          +-----+          | |
 817 *   +----------|  R  |----------+ |
 818 *              |     |<-----------+
 819 *              +-----+
 820 *
 821 * Key:  ---X-->  HEAD flag set in pointer
 822 *         T      Tail page
 823 *         R      Reader page
 824 *         N      Next page
 825 *
 826 * (see __rb_reserve_next() to see where this happens)
 827 *
 828 *  What the above shows is that the reader just swapped out
 829 *  the reader page with a page in the buffer, but before it
 830 *  could make the new header point back to the new page added
 831 *  it was preempted by a writer. The writer moved forward onto
 832 *  the new page added by the reader and is about to move forward
 833 *  again.
 834 *
 835 *  You can see, it is legitimate for the previous pointer of
 836 *  the head (or any page) not to point back to itself. But only
 837 *  temporarily.
 838 */
 839
 840#define RB_PAGE_NORMAL          0UL
 841#define RB_PAGE_HEAD            1UL
 842#define RB_PAGE_UPDATE          2UL
 843
 844
 845#define RB_FLAG_MASK            3UL
 846
 847/* PAGE_MOVED is not part of the mask */
 848#define RB_PAGE_MOVED           4UL
 849
 850/*
 851 * rb_list_head - remove any bit
 852 */
 853static struct list_head *rb_list_head(struct list_head *list)
 854{
 855        unsigned long val = (unsigned long)list;
 856
 857        return (struct list_head *)(val & ~RB_FLAG_MASK);
 858}
 859
 860/*
 861 * rb_is_head_page - test if the given page is the head page
 862 *
 863 * Because the reader may move the head_page pointer, we can
 864 * not trust what the head page is (it may be pointing to
 865 * the reader page). But if the next page is a header page,
 866 * its flags will be non zero.
 867 */
 868static inline int
 869rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
 870                struct buffer_page *page, struct list_head *list)
 871{
 872        unsigned long val;
 873
 874        val = (unsigned long)list->next;
 875
 876        if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
 877                return RB_PAGE_MOVED;
 878
 879        return val & RB_FLAG_MASK;
 880}
 881
 882/*
 883 * rb_is_reader_page
 884 *
 885 * The unique thing about the reader page, is that, if the
 886 * writer is ever on it, the previous pointer never points
 887 * back to the reader page.
 888 */
 889static bool rb_is_reader_page(struct buffer_page *page)
 890{
 891        struct list_head *list = page->list.prev;
 892
 893        return rb_list_head(list->next) != &page->list;
 894}
 895
 896/*
 897 * rb_set_list_to_head - set a list_head to be pointing to head.
 898 */
 899static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
 900                                struct list_head *list)
 901{
 902        unsigned long *ptr;
 903
 904        ptr = (unsigned long *)&list->next;
 905        *ptr |= RB_PAGE_HEAD;
 906        *ptr &= ~RB_PAGE_UPDATE;
 907}
 908
 909/*
 910 * rb_head_page_activate - sets up head page
 911 */
 912static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
 913{
 914        struct buffer_page *head;
 915
 916        head = cpu_buffer->head_page;
 917        if (!head)
 918                return;
 919
 920        /*
 921         * Set the previous list pointer to have the HEAD flag.
 922         */
 923        rb_set_list_to_head(cpu_buffer, head->list.prev);
 924}
 925
 926static void rb_list_head_clear(struct list_head *list)
 927{
 928        unsigned long *ptr = (unsigned long *)&list->next;
 929
 930        *ptr &= ~RB_FLAG_MASK;
 931}
 932
 933/*
 934 * rb_head_page_deactivate - clears head page ptr (for free list)
 935 */
 936static void
 937rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
 938{
 939        struct list_head *hd;
 940
 941        /* Go through the whole list and clear any pointers found. */
 942        rb_list_head_clear(cpu_buffer->pages);
 943
 944        list_for_each(hd, cpu_buffer->pages)
 945                rb_list_head_clear(hd);
 946}
 947
 948static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
 949                            struct buffer_page *head,
 950                            struct buffer_page *prev,
 951                            int old_flag, int new_flag)
 952{
 953        struct list_head *list;
 954        unsigned long val = (unsigned long)&head->list;
 955        unsigned long ret;
 956
 957        list = &prev->list;
 958
 959        val &= ~RB_FLAG_MASK;
 960
 961        ret = cmpxchg((unsigned long *)&list->next,
 962                      val | old_flag, val | new_flag);
 963
 964        /* check if the reader took the page */
 965        if ((ret & ~RB_FLAG_MASK) != val)
 966                return RB_PAGE_MOVED;
 967
 968        return ret & RB_FLAG_MASK;
 969}
 970
 971static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
 972                                   struct buffer_page *head,
 973                                   struct buffer_page *prev,
 974                                   int old_flag)
 975{
 976        return rb_head_page_set(cpu_buffer, head, prev,
 977                                old_flag, RB_PAGE_UPDATE);
 978}
 979
 980static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
 981                                 struct buffer_page *head,
 982                                 struct buffer_page *prev,
 983                                 int old_flag)
 984{
 985        return rb_head_page_set(cpu_buffer, head, prev,
 986                                old_flag, RB_PAGE_HEAD);
 987}
 988
 989static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
 990                                   struct buffer_page *head,
 991                                   struct buffer_page *prev,
 992                                   int old_flag)
 993{
 994        return rb_head_page_set(cpu_buffer, head, prev,
 995                                old_flag, RB_PAGE_NORMAL);
 996}
 997
 998static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
 999                               struct buffer_page **bpage)
1000{
1001        struct list_head *p = rb_list_head((*bpage)->list.next);
1002
1003        *bpage = list_entry(p, struct buffer_page, list);
1004}
1005
1006static struct buffer_page *
1007rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1008{
1009        struct buffer_page *head;
1010        struct buffer_page *page;
1011        struct list_head *list;
1012        int i;
1013
1014        if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1015                return NULL;
1016
1017        /* sanity check */
1018        list = cpu_buffer->pages;
1019        if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1020                return NULL;
1021
1022        page = head = cpu_buffer->head_page;
1023        /*
1024         * It is possible that the writer moves the header behind
1025         * where we started, and we miss in one loop.
1026         * A second loop should grab the header, but we'll do
1027         * three loops just because I'm paranoid.
1028         */
1029        for (i = 0; i < 3; i++) {
1030                do {
1031                        if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1032                                cpu_buffer->head_page = page;
1033                                return page;
1034                        }
1035                        rb_inc_page(cpu_buffer, &page);
1036                } while (page != head);
1037        }
1038
1039        RB_WARN_ON(cpu_buffer, 1);
1040
1041        return NULL;
1042}
1043
1044static int rb_head_page_replace(struct buffer_page *old,
1045                                struct buffer_page *new)
1046{
1047        unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1048        unsigned long val;
1049        unsigned long ret;
1050
1051        val = *ptr & ~RB_FLAG_MASK;
1052        val |= RB_PAGE_HEAD;
1053
1054        ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1055
1056        return ret == val;
1057}
1058
1059/*
1060 * rb_tail_page_update - move the tail page forward
1061 */
1062static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1063                               struct buffer_page *tail_page,
1064                               struct buffer_page *next_page)
1065{
1066        unsigned long old_entries;
1067        unsigned long old_write;
1068
1069        /*
1070         * The tail page now needs to be moved forward.
1071         *
1072         * We need to reset the tail page, but without messing
1073         * with possible erasing of data brought in by interrupts
1074         * that have moved the tail page and are currently on it.
1075         *
1076         * We add a counter to the write field to denote this.
1077         */
1078        old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1079        old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1080
1081        local_inc(&cpu_buffer->pages_touched);
1082        /*
1083         * Just make sure we have seen our old_write and synchronize
1084         * with any interrupts that come in.
1085         */
1086        barrier();
1087
1088        /*
1089         * If the tail page is still the same as what we think
1090         * it is, then it is up to us to update the tail
1091         * pointer.
1092         */
1093        if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1094                /* Zero the write counter */
1095                unsigned long val = old_write & ~RB_WRITE_MASK;
1096                unsigned long eval = old_entries & ~RB_WRITE_MASK;
1097
1098                /*
1099                 * This will only succeed if an interrupt did
1100                 * not come in and change it. In which case, we
1101                 * do not want to modify it.
1102                 *
1103                 * We add (void) to let the compiler know that we do not care
1104                 * about the return value of these functions. We use the
1105                 * cmpxchg to only update if an interrupt did not already
1106                 * do it for us. If the cmpxchg fails, we don't care.
1107                 */
1108                (void)local_cmpxchg(&next_page->write, old_write, val);
1109                (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1110
1111                /*
1112                 * No need to worry about races with clearing out the commit.
1113                 * it only can increment when a commit takes place. But that
1114                 * only happens in the outer most nested commit.
1115                 */
1116                local_set(&next_page->page->commit, 0);
1117
1118                /* Again, either we update tail_page or an interrupt does */
1119                (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1120        }
1121}
1122
1123static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1124                          struct buffer_page *bpage)
1125{
1126        unsigned long val = (unsigned long)bpage;
1127
1128        if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1129                return 1;
1130
1131        return 0;
1132}
1133
1134/**
1135 * rb_check_list - make sure a pointer to a list has the last bits zero
1136 */
1137static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
1138                         struct list_head *list)
1139{
1140        if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
1141                return 1;
1142        if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
1143                return 1;
1144        return 0;
1145}
1146
1147/**
1148 * rb_check_pages - integrity check of buffer pages
1149 * @cpu_buffer: CPU buffer with pages to test
1150 *
1151 * As a safety measure we check to make sure the data pages have not
1152 * been corrupted.
1153 */
1154static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1155{
1156        struct list_head *head = cpu_buffer->pages;
1157        struct buffer_page *bpage, *tmp;
1158
1159        /* Reset the head page if it exists */
1160        if (cpu_buffer->head_page)
1161                rb_set_head_page(cpu_buffer);
1162
1163        rb_head_page_deactivate(cpu_buffer);
1164
1165        if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
1166                return -1;
1167        if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
1168                return -1;
1169
1170        if (rb_check_list(cpu_buffer, head))
1171                return -1;
1172
1173        list_for_each_entry_safe(bpage, tmp, head, list) {
1174                if (RB_WARN_ON(cpu_buffer,
1175                               bpage->list.next->prev != &bpage->list))
1176                        return -1;
1177                if (RB_WARN_ON(cpu_buffer,
1178                               bpage->list.prev->next != &bpage->list))
1179                        return -1;
1180                if (rb_check_list(cpu_buffer, &bpage->list))
1181                        return -1;
1182        }
1183
1184        rb_head_page_activate(cpu_buffer);
1185
1186        return 0;
1187}
1188
1189static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1190{
1191        struct buffer_page *bpage, *tmp;
1192        bool user_thread = current->mm != NULL;
1193        gfp_t mflags;
1194        long i;
1195
1196        /*
1197         * Check if the available memory is there first.
1198         * Note, si_mem_available() only gives us a rough estimate of available
1199         * memory. It may not be accurate. But we don't care, we just want
1200         * to prevent doing any allocation when it is obvious that it is
1201         * not going to succeed.
1202         */
1203        i = si_mem_available();
1204        if (i < nr_pages)
1205                return -ENOMEM;
1206
1207        /*
1208         * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1209         * gracefully without invoking oom-killer and the system is not
1210         * destabilized.
1211         */
1212        mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1213
1214        /*
1215         * If a user thread allocates too much, and si_mem_available()
1216         * reports there's enough memory, even though there is not.
1217         * Make sure the OOM killer kills this thread. This can happen
1218         * even with RETRY_MAYFAIL because another task may be doing
1219         * an allocation after this task has taken all memory.
1220         * This is the task the OOM killer needs to take out during this
1221         * loop, even if it was triggered by an allocation somewhere else.
1222         */
1223        if (user_thread)
1224                set_current_oom_origin();
1225        for (i = 0; i < nr_pages; i++) {
1226                struct page *page;
1227
1228                bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1229                                    mflags, cpu_to_node(cpu));
1230                if (!bpage)
1231                        goto free_pages;
1232
1233                list_add(&bpage->list, pages);
1234
1235                page = alloc_pages_node(cpu_to_node(cpu), mflags, 0);
1236                if (!page)
1237                        goto free_pages;
1238                bpage->page = page_address(page);
1239                rb_init_page(bpage->page);
1240
1241                if (user_thread && fatal_signal_pending(current))
1242                        goto free_pages;
1243        }
1244        if (user_thread)
1245                clear_current_oom_origin();
1246
1247        return 0;
1248
1249free_pages:
1250        list_for_each_entry_safe(bpage, tmp, pages, list) {
1251                list_del_init(&bpage->list);
1252                free_buffer_page(bpage);
1253        }
1254        if (user_thread)
1255                clear_current_oom_origin();
1256
1257        return -ENOMEM;
1258}
1259
1260static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1261                             unsigned long nr_pages)
1262{
1263        LIST_HEAD(pages);
1264
1265        WARN_ON(!nr_pages);
1266
1267        if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1268                return -ENOMEM;
1269
1270        /*
1271         * The ring buffer page list is a circular list that does not
1272         * start and end with a list head. All page list items point to
1273         * other pages.
1274         */
1275        cpu_buffer->pages = pages.next;
1276        list_del(&pages);
1277
1278        cpu_buffer->nr_pages = nr_pages;
1279
1280        rb_check_pages(cpu_buffer);
1281
1282        return 0;
1283}
1284
1285static struct ring_buffer_per_cpu *
1286rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1287{
1288        struct ring_buffer_per_cpu *cpu_buffer;
1289        struct buffer_page *bpage;
1290        struct page *page;
1291        int ret;
1292
1293        cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1294                                  GFP_KERNEL, cpu_to_node(cpu));
1295        if (!cpu_buffer)
1296                return NULL;
1297
1298        cpu_buffer->cpu = cpu;
1299        cpu_buffer->buffer = buffer;
1300        raw_spin_lock_init(&cpu_buffer->reader_lock);
1301        lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1302        cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1303        INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1304        init_completion(&cpu_buffer->update_done);
1305        init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1306        init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1307        init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1308
1309        bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1310                            GFP_KERNEL, cpu_to_node(cpu));
1311        if (!bpage)
1312                goto fail_free_buffer;
1313
1314        rb_check_bpage(cpu_buffer, bpage);
1315
1316        cpu_buffer->reader_page = bpage;
1317        page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1318        if (!page)
1319                goto fail_free_reader;
1320        bpage->page = page_address(page);
1321        rb_init_page(bpage->page);
1322
1323        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1324        INIT_LIST_HEAD(&cpu_buffer->new_pages);
1325
1326        ret = rb_allocate_pages(cpu_buffer, nr_pages);
1327        if (ret < 0)
1328                goto fail_free_reader;
1329
1330        cpu_buffer->head_page
1331                = list_entry(cpu_buffer->pages, struct buffer_page, list);
1332        cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1333
1334        rb_head_page_activate(cpu_buffer);
1335
1336        return cpu_buffer;
1337
1338 fail_free_reader:
1339        free_buffer_page(cpu_buffer->reader_page);
1340
1341 fail_free_buffer:
1342        kfree(cpu_buffer);
1343        return NULL;
1344}
1345
1346static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1347{
1348        struct list_head *head = cpu_buffer->pages;
1349        struct buffer_page *bpage, *tmp;
1350
1351        free_buffer_page(cpu_buffer->reader_page);
1352
1353        rb_head_page_deactivate(cpu_buffer);
1354
1355        if (head) {
1356                list_for_each_entry_safe(bpage, tmp, head, list) {
1357                        list_del_init(&bpage->list);
1358                        free_buffer_page(bpage);
1359                }
1360                bpage = list_entry(head, struct buffer_page, list);
1361                free_buffer_page(bpage);
1362        }
1363
1364        kfree(cpu_buffer);
1365}
1366
1367/**
1368 * __ring_buffer_alloc - allocate a new ring_buffer
1369 * @size: the size in bytes per cpu that is needed.
1370 * @flags: attributes to set for the ring buffer.
1371 *
1372 * Currently the only flag that is available is the RB_FL_OVERWRITE
1373 * flag. This flag means that the buffer will overwrite old data
1374 * when the buffer wraps. If this flag is not set, the buffer will
1375 * drop data when the tail hits the head.
1376 */
1377struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1378                                        struct lock_class_key *key)
1379{
1380        struct ring_buffer *buffer;
1381        long nr_pages;
1382        int bsize;
1383        int cpu;
1384        int ret;
1385
1386        /* keep it in its own cache line */
1387        buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1388                         GFP_KERNEL);
1389        if (!buffer)
1390                return NULL;
1391
1392        if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1393                goto fail_free_buffer;
1394
1395        nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1396        buffer->flags = flags;
1397        buffer->clock = trace_clock_local;
1398        buffer->reader_lock_key = key;
1399
1400        init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1401        init_waitqueue_head(&buffer->irq_work.waiters);
1402
1403        /* need at least two pages */
1404        if (nr_pages < 2)
1405                nr_pages = 2;
1406
1407        buffer->cpus = nr_cpu_ids;
1408
1409        bsize = sizeof(void *) * nr_cpu_ids;
1410        buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1411                                  GFP_KERNEL);
1412        if (!buffer->buffers)
1413                goto fail_free_cpumask;
1414
1415        cpu = raw_smp_processor_id();
1416        cpumask_set_cpu(cpu, buffer->cpumask);
1417        buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1418        if (!buffer->buffers[cpu])
1419                goto fail_free_buffers;
1420
1421        ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1422        if (ret < 0)
1423                goto fail_free_buffers;
1424
1425        mutex_init(&buffer->mutex);
1426
1427        return buffer;
1428
1429 fail_free_buffers:
1430        for_each_buffer_cpu(buffer, cpu) {
1431                if (buffer->buffers[cpu])
1432                        rb_free_cpu_buffer(buffer->buffers[cpu]);
1433        }
1434        kfree(buffer->buffers);
1435
1436 fail_free_cpumask:
1437        free_cpumask_var(buffer->cpumask);
1438
1439 fail_free_buffer:
1440        kfree(buffer);
1441        return NULL;
1442}
1443EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1444
1445/**
1446 * ring_buffer_free - free a ring buffer.
1447 * @buffer: the buffer to free.
1448 */
1449void
1450ring_buffer_free(struct ring_buffer *buffer)
1451{
1452        int cpu;
1453
1454        cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1455
1456        for_each_buffer_cpu(buffer, cpu)
1457                rb_free_cpu_buffer(buffer->buffers[cpu]);
1458
1459        kfree(buffer->buffers);
1460        free_cpumask_var(buffer->cpumask);
1461
1462        kfree(buffer);
1463}
1464EXPORT_SYMBOL_GPL(ring_buffer_free);
1465
1466void ring_buffer_set_clock(struct ring_buffer *buffer,
1467                           u64 (*clock)(void))
1468{
1469        buffer->clock = clock;
1470}
1471
1472void ring_buffer_set_time_stamp_abs(struct ring_buffer *buffer, bool abs)
1473{
1474        buffer->time_stamp_abs = abs;
1475}
1476
1477bool ring_buffer_time_stamp_abs(struct ring_buffer *buffer)
1478{
1479        return buffer->time_stamp_abs;
1480}
1481
1482static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1483
1484static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1485{
1486        return local_read(&bpage->entries) & RB_WRITE_MASK;
1487}
1488
1489static inline unsigned long rb_page_write(struct buffer_page *bpage)
1490{
1491        return local_read(&bpage->write) & RB_WRITE_MASK;
1492}
1493
1494static int
1495rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1496{
1497        struct list_head *tail_page, *to_remove, *next_page;
1498        struct buffer_page *to_remove_page, *tmp_iter_page;
1499        struct buffer_page *last_page, *first_page;
1500        unsigned long nr_removed;
1501        unsigned long head_bit;
1502        int page_entries;
1503
1504        head_bit = 0;
1505
1506        raw_spin_lock_irq(&cpu_buffer->reader_lock);
1507        atomic_inc(&cpu_buffer->record_disabled);
1508        /*
1509         * We don't race with the readers since we have acquired the reader
1510         * lock. We also don't race with writers after disabling recording.
1511         * This makes it easy to figure out the first and the last page to be
1512         * removed from the list. We unlink all the pages in between including
1513         * the first and last pages. This is done in a busy loop so that we
1514         * lose the least number of traces.
1515         * The pages are freed after we restart recording and unlock readers.
1516         */
1517        tail_page = &cpu_buffer->tail_page->list;
1518
1519        /*
1520         * tail page might be on reader page, we remove the next page
1521         * from the ring buffer
1522         */
1523        if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1524                tail_page = rb_list_head(tail_page->next);
1525        to_remove = tail_page;
1526
1527        /* start of pages to remove */
1528        first_page = list_entry(rb_list_head(to_remove->next),
1529                                struct buffer_page, list);
1530
1531        for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1532                to_remove = rb_list_head(to_remove)->next;
1533                head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1534        }
1535
1536        next_page = rb_list_head(to_remove)->next;
1537
1538        /*
1539         * Now we remove all pages between tail_page and next_page.
1540         * Make sure that we have head_bit value preserved for the
1541         * next page
1542         */
1543        tail_page->next = (struct list_head *)((unsigned long)next_page |
1544                                                head_bit);
1545        next_page = rb_list_head(next_page);
1546        next_page->prev = tail_page;
1547
1548        /* make sure pages points to a valid page in the ring buffer */
1549        cpu_buffer->pages = next_page;
1550
1551        /* update head page */
1552        if (head_bit)
1553                cpu_buffer->head_page = list_entry(next_page,
1554                                                struct buffer_page, list);
1555
1556        /*
1557         * change read pointer to make sure any read iterators reset
1558         * themselves
1559         */
1560        cpu_buffer->read = 0;
1561
1562        /* pages are removed, resume tracing and then free the pages */
1563        atomic_dec(&cpu_buffer->record_disabled);
1564        raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1565
1566        RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1567
1568        /* last buffer page to remove */
1569        last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1570                                list);
1571        tmp_iter_page = first_page;
1572
1573        do {
1574                cond_resched();
1575
1576                to_remove_page = tmp_iter_page;
1577                rb_inc_page(cpu_buffer, &tmp_iter_page);
1578
1579                /* update the counters */
1580                page_entries = rb_page_entries(to_remove_page);
1581                if (page_entries) {
1582                        /*
1583                         * If something was added to this page, it was full
1584                         * since it is not the tail page. So we deduct the
1585                         * bytes consumed in ring buffer from here.
1586                         * Increment overrun to account for the lost events.
1587                         */
1588                        local_add(page_entries, &cpu_buffer->overrun);
1589                        local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1590                }
1591
1592                /*
1593                 * We have already removed references to this list item, just
1594                 * free up the buffer_page and its page
1595                 */
1596                free_buffer_page(to_remove_page);
1597                nr_removed--;
1598
1599        } while (to_remove_page != last_page);
1600
1601        RB_WARN_ON(cpu_buffer, nr_removed);
1602
1603        return nr_removed == 0;
1604}
1605
1606static int
1607rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1608{
1609        struct list_head *pages = &cpu_buffer->new_pages;
1610        int retries, success;
1611
1612        raw_spin_lock_irq(&cpu_buffer->reader_lock);
1613        /*
1614         * We are holding the reader lock, so the reader page won't be swapped
1615         * in the ring buffer. Now we are racing with the writer trying to
1616         * move head page and the tail page.
1617         * We are going to adapt the reader page update process where:
1618         * 1. We first splice the start and end of list of new pages between
1619         *    the head page and its previous page.
1620         * 2. We cmpxchg the prev_page->next to point from head page to the
1621         *    start of new pages list.
1622         * 3. Finally, we update the head->prev to the end of new list.
1623         *
1624         * We will try this process 10 times, to make sure that we don't keep
1625         * spinning.
1626         */
1627        retries = 10;
1628        success = 0;
1629        while (retries--) {
1630                struct list_head *head_page, *prev_page, *r;
1631                struct list_head *last_page, *first_page;
1632                struct list_head *head_page_with_bit;
1633
1634                head_page = &rb_set_head_page(cpu_buffer)->list;
1635                if (!head_page)
1636                        break;
1637                prev_page = head_page->prev;
1638
1639                first_page = pages->next;
1640                last_page  = pages->prev;
1641
1642                head_page_with_bit = (struct list_head *)
1643                                     ((unsigned long)head_page | RB_PAGE_HEAD);
1644
1645                last_page->next = head_page_with_bit;
1646                first_page->prev = prev_page;
1647
1648                r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1649
1650                if (r == head_page_with_bit) {
1651                        /*
1652                         * yay, we replaced the page pointer to our new list,
1653                         * now, we just have to update to head page's prev
1654                         * pointer to point to end of list
1655                         */
1656                        head_page->prev = last_page;
1657                        success = 1;
1658                        break;
1659                }
1660        }
1661
1662        if (success)
1663                INIT_LIST_HEAD(pages);
1664        /*
1665         * If we weren't successful in adding in new pages, warn and stop
1666         * tracing
1667         */
1668        RB_WARN_ON(cpu_buffer, !success);
1669        raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1670
1671        /* free pages if they weren't inserted */
1672        if (!success) {
1673                struct buffer_page *bpage, *tmp;
1674                list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1675                                         list) {
1676                        list_del_init(&bpage->list);
1677                        free_buffer_page(bpage);
1678                }
1679        }
1680        return success;
1681}
1682
1683static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1684{
1685        int success;
1686
1687        if (cpu_buffer->nr_pages_to_update > 0)
1688                success = rb_insert_pages(cpu_buffer);
1689        else
1690                success = rb_remove_pages(cpu_buffer,
1691                                        -cpu_buffer->nr_pages_to_update);
1692
1693        if (success)
1694                cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1695}
1696
1697static void update_pages_handler(struct work_struct *work)
1698{
1699        struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1700                        struct ring_buffer_per_cpu, update_pages_work);
1701        rb_update_pages(cpu_buffer);
1702        complete(&cpu_buffer->update_done);
1703}
1704
1705/**
1706 * ring_buffer_resize - resize the ring buffer
1707 * @buffer: the buffer to resize.
1708 * @size: the new size.
1709 * @cpu_id: the cpu buffer to resize
1710 *
1711 * Minimum size is 2 * BUF_PAGE_SIZE.
1712 *
1713 * Returns 0 on success and < 0 on failure.
1714 */
1715int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1716                        int cpu_id)
1717{
1718        struct ring_buffer_per_cpu *cpu_buffer;
1719        unsigned long nr_pages;
1720        int cpu, err = 0;
1721
1722        /*
1723         * Always succeed at resizing a non-existent buffer:
1724         */
1725        if (!buffer)
1726                return size;
1727
1728        /* Make sure the requested buffer exists */
1729        if (cpu_id != RING_BUFFER_ALL_CPUS &&
1730            !cpumask_test_cpu(cpu_id, buffer->cpumask))
1731                return size;
1732
1733        nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1734
1735        /* we need a minimum of two pages */
1736        if (nr_pages < 2)
1737                nr_pages = 2;
1738
1739        size = nr_pages * BUF_PAGE_SIZE;
1740
1741        /*
1742         * Don't succeed if resizing is disabled, as a reader might be
1743         * manipulating the ring buffer and is expecting a sane state while
1744         * this is true.
1745         */
1746        if (atomic_read(&buffer->resize_disabled))
1747                return -EBUSY;
1748
1749        /* prevent another thread from changing buffer sizes */
1750        mutex_lock(&buffer->mutex);
1751
1752        if (cpu_id == RING_BUFFER_ALL_CPUS) {
1753                /* calculate the pages to update */
1754                for_each_buffer_cpu(buffer, cpu) {
1755                        cpu_buffer = buffer->buffers[cpu];
1756
1757                        cpu_buffer->nr_pages_to_update = nr_pages -
1758                                                        cpu_buffer->nr_pages;
1759                        /*
1760                         * nothing more to do for removing pages or no update
1761                         */
1762                        if (cpu_buffer->nr_pages_to_update <= 0)
1763                                continue;
1764                        /*
1765                         * to add pages, make sure all new pages can be
1766                         * allocated without receiving ENOMEM
1767                         */
1768                        INIT_LIST_HEAD(&cpu_buffer->new_pages);
1769                        if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1770                                                &cpu_buffer->new_pages, cpu)) {
1771                                /* not enough memory for new pages */
1772                                err = -ENOMEM;
1773                                goto out_err;
1774                        }
1775                }
1776
1777                get_online_cpus();
1778                /*
1779                 * Fire off all the required work handlers
1780                 * We can't schedule on offline CPUs, but it's not necessary
1781                 * since we can change their buffer sizes without any race.
1782                 */
1783                for_each_buffer_cpu(buffer, cpu) {
1784                        cpu_buffer = buffer->buffers[cpu];
1785                        if (!cpu_buffer->nr_pages_to_update)
1786                                continue;
1787
1788                        /* Can't run something on an offline CPU. */
1789                        if (!cpu_online(cpu)) {
1790                                rb_update_pages(cpu_buffer);
1791                                cpu_buffer->nr_pages_to_update = 0;
1792                        } else {
1793                                schedule_work_on(cpu,
1794                                                &cpu_buffer->update_pages_work);
1795                        }
1796                }
1797
1798                /* wait for all the updates to complete */
1799                for_each_buffer_cpu(buffer, cpu) {
1800                        cpu_buffer = buffer->buffers[cpu];
1801                        if (!cpu_buffer->nr_pages_to_update)
1802                                continue;
1803
1804                        if (cpu_online(cpu))
1805                                wait_for_completion(&cpu_buffer->update_done);
1806                        cpu_buffer->nr_pages_to_update = 0;
1807                }
1808
1809                put_online_cpus();
1810        } else {
1811                /* Make sure this CPU has been initialized */
1812                if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1813                        goto out;
1814
1815                cpu_buffer = buffer->buffers[cpu_id];
1816
1817                if (nr_pages == cpu_buffer->nr_pages)
1818                        goto out;
1819
1820                cpu_buffer->nr_pages_to_update = nr_pages -
1821                                                cpu_buffer->nr_pages;
1822
1823                INIT_LIST_HEAD(&cpu_buffer->new_pages);
1824                if (cpu_buffer->nr_pages_to_update > 0 &&
1825                        __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1826                                            &cpu_buffer->new_pages, cpu_id)) {
1827                        err = -ENOMEM;
1828                        goto out_err;
1829                }
1830
1831                get_online_cpus();
1832
1833                /* Can't run something on an offline CPU. */
1834                if (!cpu_online(cpu_id))
1835                        rb_update_pages(cpu_buffer);
1836                else {
1837                        schedule_work_on(cpu_id,
1838                                         &cpu_buffer->update_pages_work);
1839                        wait_for_completion(&cpu_buffer->update_done);
1840                }
1841
1842                cpu_buffer->nr_pages_to_update = 0;
1843                put_online_cpus();
1844        }
1845
1846 out:
1847        /*
1848         * The ring buffer resize can happen with the ring buffer
1849         * enabled, so that the update disturbs the tracing as little
1850         * as possible. But if the buffer is disabled, we do not need
1851         * to worry about that, and we can take the time to verify
1852         * that the buffer is not corrupt.
1853         */
1854        if (atomic_read(&buffer->record_disabled)) {
1855                atomic_inc(&buffer->record_disabled);
1856                /*
1857                 * Even though the buffer was disabled, we must make sure
1858                 * that it is truly disabled before calling rb_check_pages.
1859                 * There could have been a race between checking
1860                 * record_disable and incrementing it.
1861                 */
1862                synchronize_rcu();
1863                for_each_buffer_cpu(buffer, cpu) {
1864                        cpu_buffer = buffer->buffers[cpu];
1865                        rb_check_pages(cpu_buffer);
1866                }
1867                atomic_dec(&buffer->record_disabled);
1868        }
1869
1870        mutex_unlock(&buffer->mutex);
1871        return size;
1872
1873 out_err:
1874        for_each_buffer_cpu(buffer, cpu) {
1875                struct buffer_page *bpage, *tmp;
1876
1877                cpu_buffer = buffer->buffers[cpu];
1878                cpu_buffer->nr_pages_to_update = 0;
1879
1880                if (list_empty(&cpu_buffer->new_pages))
1881                        continue;
1882
1883                list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1884                                        list) {
1885                        list_del_init(&bpage->list);
1886                        free_buffer_page(bpage);
1887                }
1888        }
1889        mutex_unlock(&buffer->mutex);
1890        return err;
1891}
1892EXPORT_SYMBOL_GPL(ring_buffer_resize);
1893
1894void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1895{
1896        mutex_lock(&buffer->mutex);
1897        if (val)
1898                buffer->flags |= RB_FL_OVERWRITE;
1899        else
1900                buffer->flags &= ~RB_FL_OVERWRITE;
1901        mutex_unlock(&buffer->mutex);
1902}
1903EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
1904
1905static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1906{
1907        return bpage->page->data + index;
1908}
1909
1910static __always_inline struct ring_buffer_event *
1911rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1912{
1913        return __rb_page_index(cpu_buffer->reader_page,
1914                               cpu_buffer->reader_page->read);
1915}
1916
1917static __always_inline struct ring_buffer_event *
1918rb_iter_head_event(struct ring_buffer_iter *iter)
1919{
1920        return __rb_page_index(iter->head_page, iter->head);
1921}
1922
1923static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
1924{
1925        return local_read(&bpage->page->commit);
1926}
1927
1928/* Size is determined by what has been committed */
1929static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
1930{
1931        return rb_page_commit(bpage);
1932}
1933
1934static __always_inline unsigned
1935rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1936{
1937        return rb_page_commit(cpu_buffer->commit_page);
1938}
1939
1940static __always_inline unsigned
1941rb_event_index(struct ring_buffer_event *event)
1942{
1943        unsigned long addr = (unsigned long)event;
1944
1945        return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1946}
1947
1948static void rb_inc_iter(struct ring_buffer_iter *iter)
1949{
1950        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1951
1952        /*
1953         * The iterator could be on the reader page (it starts there).
1954         * But the head could have moved, since the reader was
1955         * found. Check for this case and assign the iterator
1956         * to the head page instead of next.
1957         */
1958        if (iter->head_page == cpu_buffer->reader_page)
1959                iter->head_page = rb_set_head_page(cpu_buffer);
1960        else
1961                rb_inc_page(cpu_buffer, &iter->head_page);
1962
1963        iter->read_stamp = iter->head_page->page->time_stamp;
1964        iter->head = 0;
1965}
1966
1967/*
1968 * rb_handle_head_page - writer hit the head page
1969 *
1970 * Returns: +1 to retry page
1971 *           0 to continue
1972 *          -1 on error
1973 */
1974static int
1975rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
1976                    struct buffer_page *tail_page,
1977                    struct buffer_page *next_page)
1978{
1979        struct buffer_page *new_head;
1980        int entries;
1981        int type;
1982        int ret;
1983
1984        entries = rb_page_entries(next_page);
1985
1986        /*
1987         * The hard part is here. We need to move the head
1988         * forward, and protect against both readers on
1989         * other CPUs and writers coming in via interrupts.
1990         */
1991        type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
1992                                       RB_PAGE_HEAD);
1993
1994        /*
1995         * type can be one of four:
1996         *  NORMAL - an interrupt already moved it for us
1997         *  HEAD   - we are the first to get here.
1998         *  UPDATE - we are the interrupt interrupting
1999         *           a current move.
2000         *  MOVED  - a reader on another CPU moved the next
2001         *           pointer to its reader page. Give up
2002         *           and try again.
2003         */
2004
2005        switch (type) {
2006        case RB_PAGE_HEAD:
2007                /*
2008                 * We changed the head to UPDATE, thus
2009                 * it is our responsibility to update
2010                 * the counters.
2011                 */
2012                local_add(entries, &cpu_buffer->overrun);
2013                local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
2014
2015                /*
2016                 * The entries will be zeroed out when we move the
2017                 * tail page.
2018                 */
2019
2020                /* still more to do */
2021                break;
2022
2023        case RB_PAGE_UPDATE:
2024                /*
2025                 * This is an interrupt that interrupt the
2026                 * previous update. Still more to do.
2027                 */
2028                break;
2029        case RB_PAGE_NORMAL:
2030                /*
2031                 * An interrupt came in before the update
2032                 * and processed this for us.
2033                 * Nothing left to do.
2034                 */
2035                return 1;
2036        case RB_PAGE_MOVED:
2037                /*
2038                 * The reader is on another CPU and just did
2039                 * a swap with our next_page.
2040                 * Try again.
2041                 */
2042                return 1;
2043        default:
2044                RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2045                return -1;
2046        }
2047
2048        /*
2049         * Now that we are here, the old head pointer is
2050         * set to UPDATE. This will keep the reader from
2051         * swapping the head page with the reader page.
2052         * The reader (on another CPU) will spin till
2053         * we are finished.
2054         *
2055         * We just need to protect against interrupts
2056         * doing the job. We will set the next pointer
2057         * to HEAD. After that, we set the old pointer
2058         * to NORMAL, but only if it was HEAD before.
2059         * otherwise we are an interrupt, and only
2060         * want the outer most commit to reset it.
2061         */
2062        new_head = next_page;
2063        rb_inc_page(cpu_buffer, &new_head);
2064
2065        ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2066                                    RB_PAGE_NORMAL);
2067
2068        /*
2069         * Valid returns are:
2070         *  HEAD   - an interrupt came in and already set it.
2071         *  NORMAL - One of two things:
2072         *            1) We really set it.
2073         *            2) A bunch of interrupts came in and moved
2074         *               the page forward again.
2075         */
2076        switch (ret) {
2077        case RB_PAGE_HEAD:
2078        case RB_PAGE_NORMAL:
2079                /* OK */
2080                break;
2081        default:
2082                RB_WARN_ON(cpu_buffer, 1);
2083                return -1;
2084        }
2085
2086        /*
2087         * It is possible that an interrupt came in,
2088         * set the head up, then more interrupts came in
2089         * and moved it again. When we get back here,
2090         * the page would have been set to NORMAL but we
2091         * just set it back to HEAD.
2092         *
2093         * How do you detect this? Well, if that happened
2094         * the tail page would have moved.
2095         */
2096        if (ret == RB_PAGE_NORMAL) {
2097                struct buffer_page *buffer_tail_page;
2098
2099                buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2100                /*
2101                 * If the tail had moved passed next, then we need
2102                 * to reset the pointer.
2103                 */
2104                if (buffer_tail_page != tail_page &&
2105                    buffer_tail_page != next_page)
2106                        rb_head_page_set_normal(cpu_buffer, new_head,
2107                                                next_page,
2108                                                RB_PAGE_HEAD);
2109        }
2110
2111        /*
2112         * If this was the outer most commit (the one that
2113         * changed the original pointer from HEAD to UPDATE),
2114         * then it is up to us to reset it to NORMAL.
2115         */
2116        if (type == RB_PAGE_HEAD) {
2117                ret = rb_head_page_set_normal(cpu_buffer, next_page,
2118                                              tail_page,
2119                                              RB_PAGE_UPDATE);
2120                if (RB_WARN_ON(cpu_buffer,
2121                               ret != RB_PAGE_UPDATE))
2122                        return -1;
2123        }
2124
2125        return 0;
2126}
2127
2128static inline void
2129rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2130              unsigned long tail, struct rb_event_info *info)
2131{
2132        struct buffer_page *tail_page = info->tail_page;
2133        struct ring_buffer_event *event;
2134        unsigned long length = info->length;
2135
2136        /*
2137         * Only the event that crossed the page boundary
2138         * must fill the old tail_page with padding.
2139         */
2140        if (tail >= BUF_PAGE_SIZE) {
2141                /*
2142                 * If the page was filled, then we still need
2143                 * to update the real_end. Reset it to zero
2144                 * and the reader will ignore it.
2145                 */
2146                if (tail == BUF_PAGE_SIZE)
2147                        tail_page->real_end = 0;
2148
2149                local_sub(length, &tail_page->write);
2150                return;
2151        }
2152
2153        event = __rb_page_index(tail_page, tail);
2154
2155        /* account for padding bytes */
2156        local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2157
2158        /*
2159         * Save the original length to the meta data.
2160         * This will be used by the reader to add lost event
2161         * counter.
2162         */
2163        tail_page->real_end = tail;
2164
2165        /*
2166         * If this event is bigger than the minimum size, then
2167         * we need to be careful that we don't subtract the
2168         * write counter enough to allow another writer to slip
2169         * in on this page.
2170         * We put in a discarded commit instead, to make sure
2171         * that this space is not used again.
2172         *
2173         * If we are less than the minimum size, we don't need to
2174         * worry about it.
2175         */
2176        if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2177                /* No room for any events */
2178
2179                /* Mark the rest of the page with padding */
2180                rb_event_set_padding(event);
2181
2182                /* Set the write back to the previous setting */
2183                local_sub(length, &tail_page->write);
2184                return;
2185        }
2186
2187        /* Put in a discarded event */
2188        event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2189        event->type_len = RINGBUF_TYPE_PADDING;
2190        /* time delta must be non zero */
2191        event->time_delta = 1;
2192
2193        /* Set write to end of buffer */
2194        length = (tail + length) - BUF_PAGE_SIZE;
2195        local_sub(length, &tail_page->write);
2196}
2197
2198static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2199
2200/*
2201 * This is the slow path, force gcc not to inline it.
2202 */
2203static noinline struct ring_buffer_event *
2204rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2205             unsigned long tail, struct rb_event_info *info)
2206{
2207        struct buffer_page *tail_page = info->tail_page;
2208        struct buffer_page *commit_page = cpu_buffer->commit_page;
2209        struct ring_buffer *buffer = cpu_buffer->buffer;
2210        struct buffer_page *next_page;
2211        int ret;
2212
2213        next_page = tail_page;
2214
2215        rb_inc_page(cpu_buffer, &next_page);
2216
2217        /*
2218         * If for some reason, we had an interrupt storm that made
2219         * it all the way around the buffer, bail, and warn
2220         * about it.
2221         */
2222        if (unlikely(next_page == commit_page)) {
2223                local_inc(&cpu_buffer->commit_overrun);
2224                goto out_reset;
2225        }
2226
2227        /*
2228         * This is where the fun begins!
2229         *
2230         * We are fighting against races between a reader that
2231         * could be on another CPU trying to swap its reader
2232         * page with the buffer head.
2233         *
2234         * We are also fighting against interrupts coming in and
2235         * moving the head or tail on us as well.
2236         *
2237         * If the next page is the head page then we have filled
2238         * the buffer, unless the commit page is still on the
2239         * reader page.
2240         */
2241        if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2242
2243                /*
2244                 * If the commit is not on the reader page, then
2245                 * move the header page.
2246                 */
2247                if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2248                        /*
2249                         * If we are not in overwrite mode,
2250                         * this is easy, just stop here.
2251                         */
2252                        if (!(buffer->flags & RB_FL_OVERWRITE)) {
2253                                local_inc(&cpu_buffer->dropped_events);
2254                                goto out_reset;
2255                        }
2256
2257                        ret = rb_handle_head_page(cpu_buffer,
2258                                                  tail_page,
2259                                                  next_page);
2260                        if (ret < 0)
2261                                goto out_reset;
2262                        if (ret)
2263                                goto out_again;
2264                } else {
2265                        /*
2266                         * We need to be careful here too. The
2267                         * commit page could still be on the reader
2268                         * page. We could have a small buffer, and
2269                         * have filled up the buffer with events
2270                         * from interrupts and such, and wrapped.
2271                         *
2272                         * Note, if the tail page is also the on the
2273                         * reader_page, we let it move out.
2274                         */
2275                        if (unlikely((cpu_buffer->commit_page !=
2276                                      cpu_buffer->tail_page) &&
2277                                     (cpu_buffer->commit_page ==
2278                                      cpu_buffer->reader_page))) {
2279                                local_inc(&cpu_buffer->commit_overrun);
2280                                goto out_reset;
2281                        }
2282                }
2283        }
2284
2285        rb_tail_page_update(cpu_buffer, tail_page, next_page);
2286
2287 out_again:
2288
2289        rb_reset_tail(cpu_buffer, tail, info);
2290
2291        /* Commit what we have for now. */
2292        rb_end_commit(cpu_buffer);
2293        /* rb_end_commit() decs committing */
2294        local_inc(&cpu_buffer->committing);
2295
2296        /* fail and let the caller try again */
2297        return ERR_PTR(-EAGAIN);
2298
2299 out_reset:
2300        /* reset write */
2301        rb_reset_tail(cpu_buffer, tail, info);
2302
2303        return NULL;
2304}
2305
2306/* Slow path, do not inline */
2307static noinline struct ring_buffer_event *
2308rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2309{
2310        if (abs)
2311                event->type_len = RINGBUF_TYPE_TIME_STAMP;
2312        else
2313                event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2314
2315        /* Not the first event on the page, or not delta? */
2316        if (abs || rb_event_index(event)) {
2317                event->time_delta = delta & TS_MASK;
2318                event->array[0] = delta >> TS_SHIFT;
2319        } else {
2320                /* nope, just zero it */
2321                event->time_delta = 0;
2322                event->array[0] = 0;
2323        }
2324
2325        return skip_time_extend(event);
2326}
2327
2328static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2329                                     struct ring_buffer_event *event);
2330
2331/**
2332 * rb_update_event - update event type and data
2333 * @event: the event to update
2334 * @type: the type of event
2335 * @length: the size of the event field in the ring buffer
2336 *
2337 * Update the type and data fields of the event. The length
2338 * is the actual size that is written to the ring buffer,
2339 * and with this, we can determine what to place into the
2340 * data field.
2341 */
2342static void
2343rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2344                struct ring_buffer_event *event,
2345                struct rb_event_info *info)
2346{
2347        unsigned length = info->length;
2348        u64 delta = info->delta;
2349
2350        /* Only a commit updates the timestamp */
2351        if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
2352                delta = 0;
2353
2354        /*
2355         * If we need to add a timestamp, then we
2356         * add it to the start of the reserved space.
2357         */
2358        if (unlikely(info->add_timestamp)) {
2359                bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
2360
2361                event = rb_add_time_stamp(event, info->delta, abs);
2362                length -= RB_LEN_TIME_EXTEND;
2363                delta = 0;
2364        }
2365
2366        event->time_delta = delta;
2367        length -= RB_EVNT_HDR_SIZE;
2368        if (length > RB_MAX_SMALL_DATA) {
2369                event->type_len = 0;
2370                event->array[0] = length;
2371        } else
2372                event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2373}
2374
2375static unsigned rb_calculate_event_length(unsigned length)
2376{
2377        struct ring_buffer_event event; /* Used only for sizeof array */
2378
2379        /* zero length can cause confusions */
2380        if (!length)
2381                length++;
2382
2383        if (length > RB_MAX_SMALL_DATA)
2384                length += sizeof(event.array[0]);
2385
2386        length += RB_EVNT_HDR_SIZE;
2387        length = ALIGN(length, RB_ALIGNMENT);
2388
2389        /*
2390         * In case the time delta is larger than the 27 bits for it
2391         * in the header, we need to add a timestamp. If another
2392         * event comes in when trying to discard this one to increase
2393         * the length, then the timestamp will be added in the allocated
2394         * space of this event. If length is bigger than the size needed
2395         * for the TIME_EXTEND, then padding has to be used. The events
2396         * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2397         * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2398         * As length is a multiple of 4, we only need to worry if it
2399         * is 12 (RB_LEN_TIME_EXTEND + 4).
2400         */
2401        if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2402                length += RB_ALIGNMENT;
2403
2404        return length;
2405}
2406
2407#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2408static inline bool sched_clock_stable(void)
2409{
2410        return true;
2411}
2412#endif
2413
2414static inline int
2415rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2416                  struct ring_buffer_event *event)
2417{
2418        unsigned long new_index, old_index;
2419        struct buffer_page *bpage;
2420        unsigned long index;
2421        unsigned long addr;
2422
2423        new_index = rb_event_index(event);
2424        old_index = new_index + rb_event_ts_length(event);
2425        addr = (unsigned long)event;
2426        addr &= PAGE_MASK;
2427
2428        bpage = READ_ONCE(cpu_buffer->tail_page);
2429
2430        if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2431                unsigned long write_mask =
2432                        local_read(&bpage->write) & ~RB_WRITE_MASK;
2433                unsigned long event_length = rb_event_length(event);
2434                /*
2435                 * This is on the tail page. It is possible that
2436                 * a write could come in and move the tail page
2437                 * and write to the next page. That is fine
2438                 * because we just shorten what is on this page.
2439                 */
2440                old_index += write_mask;
2441                new_index += write_mask;
2442                index = local_cmpxchg(&bpage->write, old_index, new_index);
2443                if (index == old_index) {
2444                        /* update counters */
2445                        local_sub(event_length, &cpu_buffer->entries_bytes);
2446                        return 1;
2447                }
2448        }
2449
2450        /* could not discard */
2451        return 0;
2452}
2453
2454static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2455{
2456        local_inc(&cpu_buffer->committing);
2457        local_inc(&cpu_buffer->commits);
2458}
2459
2460static __always_inline void
2461rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2462{
2463        unsigned long max_count;
2464
2465        /*
2466         * We only race with interrupts and NMIs on this CPU.
2467         * If we own the commit event, then we can commit
2468         * all others that interrupted us, since the interruptions
2469         * are in stack format (they finish before they come
2470         * back to us). This allows us to do a simple loop to
2471         * assign the commit to the tail.
2472         */
2473 again:
2474        max_count = cpu_buffer->nr_pages * 100;
2475
2476        while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2477                if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2478                        return;
2479                if (RB_WARN_ON(cpu_buffer,
2480                               rb_is_reader_page(cpu_buffer->tail_page)))
2481                        return;
2482                local_set(&cpu_buffer->commit_page->page->commit,
2483                          rb_page_write(cpu_buffer->commit_page));
2484                rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
2485                /* Only update the write stamp if the page has an event */
2486                if (rb_page_write(cpu_buffer->commit_page))
2487                        cpu_buffer->write_stamp =
2488                                cpu_buffer->commit_page->page->time_stamp;
2489                /* add barrier to keep gcc from optimizing too much */
2490                barrier();
2491        }
2492        while (rb_commit_index(cpu_buffer) !=
2493               rb_page_write(cpu_buffer->commit_page)) {
2494
2495                local_set(&cpu_buffer->commit_page->page->commit,
2496                          rb_page_write(cpu_buffer->commit_page));
2497                RB_WARN_ON(cpu_buffer,
2498                           local_read(&cpu_buffer->commit_page->page->commit) &
2499                           ~RB_WRITE_MASK);
2500                barrier();
2501        }
2502
2503        /* again, keep gcc from optimizing */
2504        barrier();
2505
2506        /*
2507         * If an interrupt came in just after the first while loop
2508         * and pushed the tail page forward, we will be left with
2509         * a dangling commit that will never go forward.
2510         */
2511        if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
2512                goto again;
2513}
2514
2515static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2516{
2517        unsigned long commits;
2518
2519        if (RB_WARN_ON(cpu_buffer,
2520                       !local_read(&cpu_buffer->committing)))
2521                return;
2522
2523 again:
2524        commits = local_read(&cpu_buffer->commits);
2525        /* synchronize with interrupts */
2526        barrier();
2527        if (local_read(&cpu_buffer->committing) == 1)
2528                rb_set_commit_to_write(cpu_buffer);
2529
2530        local_dec(&cpu_buffer->committing);
2531
2532        /* synchronize with interrupts */
2533        barrier();
2534
2535        /*
2536         * Need to account for interrupts coming in between the
2537         * updating of the commit page and the clearing of the
2538         * committing counter.
2539         */
2540        if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2541            !local_read(&cpu_buffer->committing)) {
2542                local_inc(&cpu_buffer->committing);
2543                goto again;
2544        }
2545}
2546
2547static inline void rb_event_discard(struct ring_buffer_event *event)
2548{
2549        if (extended_time(event))
2550                event = skip_time_extend(event);
2551
2552        /* array[0] holds the actual length for the discarded event */
2553        event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2554        event->type_len = RINGBUF_TYPE_PADDING;
2555        /* time delta must be non zero */
2556        if (!event->time_delta)
2557                event->time_delta = 1;
2558}
2559
2560static __always_inline bool
2561rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2562                   struct ring_buffer_event *event)
2563{
2564        unsigned long addr = (unsigned long)event;
2565        unsigned long index;
2566
2567        index = rb_event_index(event);
2568        addr &= PAGE_MASK;
2569
2570        return cpu_buffer->commit_page->page == (void *)addr &&
2571                rb_commit_index(cpu_buffer) == index;
2572}
2573
2574static __always_inline void
2575rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2576                      struct ring_buffer_event *event)
2577{
2578        u64 delta;
2579
2580        /*
2581         * The event first in the commit queue updates the
2582         * time stamp.
2583         */
2584        if (rb_event_is_commit(cpu_buffer, event)) {
2585                /*
2586                 * A commit event that is first on a page
2587                 * updates the write timestamp with the page stamp
2588                 */
2589                if (!rb_event_index(event))
2590                        cpu_buffer->write_stamp =
2591                                cpu_buffer->commit_page->page->time_stamp;
2592                else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2593                        delta = ring_buffer_event_time_stamp(event);
2594                        cpu_buffer->write_stamp += delta;
2595                } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
2596                        delta = ring_buffer_event_time_stamp(event);
2597                        cpu_buffer->write_stamp = delta;
2598                } else
2599                        cpu_buffer->write_stamp += event->time_delta;
2600        }
2601}
2602
2603static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2604                      struct ring_buffer_event *event)
2605{
2606        local_inc(&cpu_buffer->entries);
2607        rb_update_write_stamp(cpu_buffer, event);
2608        rb_end_commit(cpu_buffer);
2609}
2610
2611static __always_inline void
2612rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
2613{
2614        size_t nr_pages;
2615        size_t dirty;
2616        size_t full;
2617
2618        if (buffer->irq_work.waiters_pending) {
2619                buffer->irq_work.waiters_pending = false;
2620                /* irq_work_queue() supplies it's own memory barriers */
2621                irq_work_queue(&buffer->irq_work.work);
2622        }
2623
2624        if (cpu_buffer->irq_work.waiters_pending) {
2625                cpu_buffer->irq_work.waiters_pending = false;
2626                /* irq_work_queue() supplies it's own memory barriers */
2627                irq_work_queue(&cpu_buffer->irq_work.work);
2628        }
2629
2630        if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
2631                return;
2632
2633        if (cpu_buffer->reader_page == cpu_buffer->commit_page)
2634                return;
2635
2636        if (!cpu_buffer->irq_work.full_waiters_pending)
2637                return;
2638
2639        cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
2640
2641        full = cpu_buffer->shortest_full;
2642        nr_pages = cpu_buffer->nr_pages;
2643        dirty = ring_buffer_nr_dirty_pages(buffer, cpu_buffer->cpu);
2644        if (full && nr_pages && (dirty * 100) <= full * nr_pages)
2645                return;
2646
2647        cpu_buffer->irq_work.wakeup_full = true;
2648        cpu_buffer->irq_work.full_waiters_pending = false;
2649        /* irq_work_queue() supplies it's own memory barriers */
2650        irq_work_queue(&cpu_buffer->irq_work.work);
2651}
2652
2653/*
2654 * The lock and unlock are done within a preempt disable section.
2655 * The current_context per_cpu variable can only be modified
2656 * by the current task between lock and unlock. But it can
2657 * be modified more than once via an interrupt. To pass this
2658 * information from the lock to the unlock without having to
2659 * access the 'in_interrupt()' functions again (which do show
2660 * a bit of overhead in something as critical as function tracing,
2661 * we use a bitmask trick.
2662 *
2663 *  bit 0 =  NMI context
2664 *  bit 1 =  IRQ context
2665 *  bit 2 =  SoftIRQ context
2666 *  bit 3 =  normal context.
2667 *
2668 * This works because this is the order of contexts that can
2669 * preempt other contexts. A SoftIRQ never preempts an IRQ
2670 * context.
2671 *
2672 * When the context is determined, the corresponding bit is
2673 * checked and set (if it was set, then a recursion of that context
2674 * happened).
2675 *
2676 * On unlock, we need to clear this bit. To do so, just subtract
2677 * 1 from the current_context and AND it to itself.
2678 *
2679 * (binary)
2680 *  101 - 1 = 100
2681 *  101 & 100 = 100 (clearing bit zero)
2682 *
2683 *  1010 - 1 = 1001
2684 *  1010 & 1001 = 1000 (clearing bit 1)
2685 *
2686 * The least significant bit can be cleared this way, and it
2687 * just so happens that it is the same bit corresponding to
2688 * the current context.
2689 */
2690
2691static __always_inline int
2692trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
2693{
2694        unsigned int val = cpu_buffer->current_context;
2695        unsigned long pc = preempt_count();
2696        int bit;
2697
2698        if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET)))
2699                bit = RB_CTX_NORMAL;
2700        else
2701                bit = pc & NMI_MASK ? RB_CTX_NMI :
2702                        pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ;
2703
2704        if (unlikely(val & (1 << (bit + cpu_buffer->nest))))
2705                return 1;
2706
2707        val |= (1 << (bit + cpu_buffer->nest));
2708        cpu_buffer->current_context = val;
2709
2710        return 0;
2711}
2712
2713static __always_inline void
2714trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
2715{
2716        cpu_buffer->current_context &=
2717                cpu_buffer->current_context - (1 << cpu_buffer->nest);
2718}
2719
2720/* The recursive locking above uses 4 bits */
2721#define NESTED_BITS 4
2722
2723/**
2724 * ring_buffer_nest_start - Allow to trace while nested
2725 * @buffer: The ring buffer to modify
2726 *
2727 * The ring buffer has a safety mechanism to prevent recursion.
2728 * But there may be a case where a trace needs to be done while
2729 * tracing something else. In this case, calling this function
2730 * will allow this function to nest within a currently active
2731 * ring_buffer_lock_reserve().
2732 *
2733 * Call this function before calling another ring_buffer_lock_reserve() and
2734 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
2735 */
2736void ring_buffer_nest_start(struct ring_buffer *buffer)
2737{
2738        struct ring_buffer_per_cpu *cpu_buffer;
2739        int cpu;
2740
2741        /* Enabled by ring_buffer_nest_end() */
2742        preempt_disable_notrace();
2743        cpu = raw_smp_processor_id();
2744        cpu_buffer = buffer->buffers[cpu];
2745        /* This is the shift value for the above recursive locking */
2746        cpu_buffer->nest += NESTED_BITS;
2747}
2748
2749/**
2750 * ring_buffer_nest_end - Allow to trace while nested
2751 * @buffer: The ring buffer to modify
2752 *
2753 * Must be called after ring_buffer_nest_start() and after the
2754 * ring_buffer_unlock_commit().
2755 */
2756void ring_buffer_nest_end(struct ring_buffer *buffer)
2757{
2758        struct ring_buffer_per_cpu *cpu_buffer;
2759        int cpu;
2760
2761        /* disabled by ring_buffer_nest_start() */
2762        cpu = raw_smp_processor_id();
2763        cpu_buffer = buffer->buffers[cpu];
2764        /* This is the shift value for the above recursive locking */
2765        cpu_buffer->nest -= NESTED_BITS;
2766        preempt_enable_notrace();
2767}
2768
2769/**
2770 * ring_buffer_unlock_commit - commit a reserved
2771 * @buffer: The buffer to commit to
2772 * @event: The event pointer to commit.
2773 *
2774 * This commits the data to the ring buffer, and releases any locks held.
2775 *
2776 * Must be paired with ring_buffer_lock_reserve.
2777 */
2778int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2779                              struct ring_buffer_event *event)
2780{
2781        struct ring_buffer_per_cpu *cpu_buffer;
2782        int cpu = raw_smp_processor_id();
2783
2784        cpu_buffer = buffer->buffers[cpu];
2785
2786        rb_commit(cpu_buffer, event);
2787
2788        rb_wakeups(buffer, cpu_buffer);
2789
2790        trace_recursive_unlock(cpu_buffer);
2791
2792        preempt_enable_notrace();
2793
2794        return 0;
2795}
2796EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2797
2798static noinline void
2799rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2800                    struct rb_event_info *info)
2801{
2802        WARN_ONCE(info->delta > (1ULL << 59),
2803                  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2804                  (unsigned long long)info->delta,
2805                  (unsigned long long)info->ts,
2806                  (unsigned long long)cpu_buffer->write_stamp,
2807                  sched_clock_stable() ? "" :
2808                  "If you just came from a suspend/resume,\n"
2809                  "please switch to the trace global clock:\n"
2810                  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
2811                  "or add trace_clock=global to the kernel command line\n");
2812        info->add_timestamp = 1;
2813}
2814
2815static struct ring_buffer_event *
2816__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2817                  struct rb_event_info *info)
2818{
2819        struct ring_buffer_event *event;
2820        struct buffer_page *tail_page;
2821        unsigned long tail, write;
2822
2823        /*
2824         * If the time delta since the last event is too big to
2825         * hold in the time field of the event, then we append a
2826         * TIME EXTEND event ahead of the data event.
2827         */
2828        if (unlikely(info->add_timestamp))
2829                info->length += RB_LEN_TIME_EXTEND;
2830
2831        /* Don't let the compiler play games with cpu_buffer->tail_page */
2832        tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
2833        write = local_add_return(info->length, &tail_page->write);
2834
2835        /* set write to only the index of the write */
2836        write &= RB_WRITE_MASK;
2837        tail = write - info->length;
2838
2839        /*
2840         * If this is the first commit on the page, then it has the same
2841         * timestamp as the page itself.
2842         */
2843        if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
2844                info->delta = 0;
2845
2846        /* See if we shot pass the end of this buffer page */
2847        if (unlikely(write > BUF_PAGE_SIZE))
2848                return rb_move_tail(cpu_buffer, tail, info);
2849
2850        /* We reserved something on the buffer */
2851
2852        event = __rb_page_index(tail_page, tail);
2853        rb_update_event(cpu_buffer, event, info);
2854
2855        local_inc(&tail_page->entries);
2856
2857        /*
2858         * If this is the first commit on the page, then update
2859         * its timestamp.
2860         */
2861        if (!tail)
2862                tail_page->page->time_stamp = info->ts;
2863
2864        /* account for these added bytes */
2865        local_add(info->length, &cpu_buffer->entries_bytes);
2866
2867        return event;
2868}
2869
2870static __always_inline struct ring_buffer_event *
2871rb_reserve_next_event(struct ring_buffer *buffer,
2872                      struct ring_buffer_per_cpu *cpu_buffer,
2873                      unsigned long length)
2874{
2875        struct ring_buffer_event *event;
2876        struct rb_event_info info;
2877        int nr_loops = 0;
2878        u64 diff;
2879
2880        rb_start_commit(cpu_buffer);
2881
2882#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2883        /*
2884         * Due to the ability to swap a cpu buffer from a buffer
2885         * it is possible it was swapped before we committed.
2886         * (committing stops a swap). We check for it here and
2887         * if it happened, we have to fail the write.
2888         */
2889        barrier();
2890        if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
2891                local_dec(&cpu_buffer->committing);
2892                local_dec(&cpu_buffer->commits);
2893                return NULL;
2894        }
2895#endif
2896
2897        info.length = rb_calculate_event_length(length);
2898 again:
2899        info.add_timestamp = 0;
2900        info.delta = 0;
2901
2902        /*
2903         * We allow for interrupts to reenter here and do a trace.
2904         * If one does, it will cause this original code to loop
2905         * back here. Even with heavy interrupts happening, this
2906         * should only happen a few times in a row. If this happens
2907         * 1000 times in a row, there must be either an interrupt
2908         * storm or we have something buggy.
2909         * Bail!
2910         */
2911        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
2912                goto out_fail;
2913
2914        info.ts = rb_time_stamp(cpu_buffer->buffer);
2915        diff = info.ts - cpu_buffer->write_stamp;
2916
2917        /* make sure this diff is calculated here */
2918        barrier();
2919
2920        if (ring_buffer_time_stamp_abs(buffer)) {
2921                info.delta = info.ts;
2922                rb_handle_timestamp(cpu_buffer, &info);
2923        } else /* Did the write stamp get updated already? */
2924                if (likely(info.ts >= cpu_buffer->write_stamp)) {
2925                info.delta = diff;
2926                if (unlikely(test_time_stamp(info.delta)))
2927                        rb_handle_timestamp(cpu_buffer, &info);
2928        }
2929
2930        event = __rb_reserve_next(cpu_buffer, &info);
2931
2932        if (unlikely(PTR_ERR(event) == -EAGAIN)) {
2933                if (info.add_timestamp)
2934                        info.length -= RB_LEN_TIME_EXTEND;
2935                goto again;
2936        }
2937
2938        if (!event)
2939                goto out_fail;
2940
2941        return event;
2942
2943 out_fail:
2944        rb_end_commit(cpu_buffer);
2945        return NULL;
2946}
2947
2948/**
2949 * ring_buffer_lock_reserve - reserve a part of the buffer
2950 * @buffer: the ring buffer to reserve from
2951 * @length: the length of the data to reserve (excluding event header)
2952 *
2953 * Returns a reserved event on the ring buffer to copy directly to.
2954 * The user of this interface will need to get the body to write into
2955 * and can use the ring_buffer_event_data() interface.
2956 *
2957 * The length is the length of the data needed, not the event length
2958 * which also includes the event header.
2959 *
2960 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
2961 * If NULL is returned, then nothing has been allocated or locked.
2962 */
2963struct ring_buffer_event *
2964ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
2965{
2966        struct ring_buffer_per_cpu *cpu_buffer;
2967        struct ring_buffer_event *event;
2968        int cpu;
2969
2970        /* If we are tracing schedule, we don't want to recurse */
2971        preempt_disable_notrace();
2972
2973        if (unlikely(atomic_read(&buffer->record_disabled)))
2974                goto out;
2975
2976        cpu = raw_smp_processor_id();
2977
2978        if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
2979                goto out;
2980
2981        cpu_buffer = buffer->buffers[cpu];
2982
2983        if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
2984                goto out;
2985
2986        if (unlikely(length > BUF_MAX_DATA_SIZE))
2987                goto out;
2988
2989        if (unlikely(trace_recursive_lock(cpu_buffer)))
2990                goto out;
2991
2992        event = rb_reserve_next_event(buffer, cpu_buffer, length);
2993        if (!event)
2994                goto out_unlock;
2995
2996        return event;
2997
2998 out_unlock:
2999        trace_recursive_unlock(cpu_buffer);
3000 out:
3001        preempt_enable_notrace();
3002        return NULL;
3003}
3004EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3005
3006/*
3007 * Decrement the entries to the page that an event is on.
3008 * The event does not even need to exist, only the pointer
3009 * to the page it is on. This may only be called before the commit
3010 * takes place.
3011 */
3012static inline void
3013rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3014                   struct ring_buffer_event *event)
3015{
3016        unsigned long addr = (unsigned long)event;
3017        struct buffer_page *bpage = cpu_buffer->commit_page;
3018        struct buffer_page *start;
3019
3020        addr &= PAGE_MASK;
3021
3022        /* Do the likely case first */
3023        if (likely(bpage->page == (void *)addr)) {
3024                local_dec(&bpage->entries);
3025                return;
3026        }
3027
3028        /*
3029         * Because the commit page may be on the reader page we
3030         * start with the next page and check the end loop there.
3031         */
3032        rb_inc_page(cpu_buffer, &bpage);
3033        start = bpage;
3034        do {
3035                if (bpage->page == (void *)addr) {
3036                        local_dec(&bpage->entries);
3037                        return;
3038                }
3039                rb_inc_page(cpu_buffer, &bpage);
3040        } while (bpage != start);
3041
3042        /* commit not part of this buffer?? */
3043        RB_WARN_ON(cpu_buffer, 1);
3044}
3045
3046/**
3047 * ring_buffer_commit_discard - discard an event that has not been committed
3048 * @buffer: the ring buffer
3049 * @event: non committed event to discard
3050 *
3051 * Sometimes an event that is in the ring buffer needs to be ignored.
3052 * This function lets the user discard an event in the ring buffer
3053 * and then that event will not be read later.
3054 *
3055 * This function only works if it is called before the item has been
3056 * committed. It will try to free the event from the ring buffer
3057 * if another event has not been added behind it.
3058 *
3059 * If another event has been added behind it, it will set the event
3060 * up as discarded, and perform the commit.
3061 *
3062 * If this function is called, do not call ring_buffer_unlock_commit on
3063 * the event.
3064 */
3065void ring_buffer_discard_commit(struct ring_buffer *buffer,
3066                                struct ring_buffer_event *event)
3067{
3068        struct ring_buffer_per_cpu *cpu_buffer;
3069        int cpu;
3070
3071        /* The event is discarded regardless */
3072        rb_event_discard(event);
3073
3074        cpu = smp_processor_id();
3075        cpu_buffer = buffer->buffers[cpu];
3076
3077        /*
3078         * This must only be called if the event has not been
3079         * committed yet. Thus we can assume that preemption
3080         * is still disabled.
3081         */
3082        RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3083
3084        rb_decrement_entry(cpu_buffer, event);
3085        if (rb_try_to_discard(cpu_buffer, event))
3086                goto out;
3087
3088        /*
3089         * The commit is still visible by the reader, so we
3090         * must still update the timestamp.
3091         */
3092        rb_update_write_stamp(cpu_buffer, event);
3093 out:
3094        rb_end_commit(cpu_buffer);
3095
3096        trace_recursive_unlock(cpu_buffer);
3097
3098        preempt_enable_notrace();
3099
3100}
3101EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3102
3103/**
3104 * ring_buffer_write - write data to the buffer without reserving
3105 * @buffer: The ring buffer to write to.
3106 * @length: The length of the data being written (excluding the event header)
3107 * @data: The data to write to the buffer.
3108 *
3109 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3110 * one function. If you already have the data to write to the buffer, it
3111 * may be easier to simply call this function.
3112 *
3113 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3114 * and not the length of the event which would hold the header.
3115 */
3116int ring_buffer_write(struct ring_buffer *buffer,
3117                      unsigned long length,
3118                      void *data)
3119{
3120        struct ring_buffer_per_cpu *cpu_buffer;
3121        struct ring_buffer_event *event;
3122        void *body;
3123        int ret = -EBUSY;
3124        int cpu;
3125
3126        preempt_disable_notrace();
3127
3128        if (atomic_read(&buffer->record_disabled))
3129                goto out;
3130
3131        cpu = raw_smp_processor_id();
3132
3133        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3134                goto out;
3135
3136        cpu_buffer = buffer->buffers[cpu];
3137
3138        if (atomic_read(&cpu_buffer->record_disabled))
3139                goto out;
3140
3141        if (length > BUF_MAX_DATA_SIZE)
3142                goto out;
3143
3144        if (unlikely(trace_recursive_lock(cpu_buffer)))
3145                goto out;
3146
3147        event = rb_reserve_next_event(buffer, cpu_buffer, length);
3148        if (!event)
3149                goto out_unlock;
3150
3151        body = rb_event_data(event);
3152
3153        memcpy(body, data, length);
3154
3155        rb_commit(cpu_buffer, event);
3156
3157        rb_wakeups(buffer, cpu_buffer);
3158
3159        ret = 0;
3160
3161 out_unlock:
3162        trace_recursive_unlock(cpu_buffer);
3163
3164 out:
3165        preempt_enable_notrace();
3166
3167        return ret;
3168}
3169EXPORT_SYMBOL_GPL(ring_buffer_write);
3170
3171static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3172{
3173        struct buffer_page *reader = cpu_buffer->reader_page;
3174        struct buffer_page *head = rb_set_head_page(cpu_buffer);
3175        struct buffer_page *commit = cpu_buffer->commit_page;
3176
3177        /* In case of error, head will be NULL */
3178        if (unlikely(!head))
3179                return true;
3180
3181        return reader->read == rb_page_commit(reader) &&
3182                (commit == reader ||
3183                 (commit == head &&
3184                  head->read == rb_page_commit(commit)));
3185}
3186
3187/**
3188 * ring_buffer_record_disable - stop all writes into the buffer
3189 * @buffer: The ring buffer to stop writes to.
3190 *
3191 * This prevents all writes to the buffer. Any attempt to write
3192 * to the buffer after this will fail and return NULL.
3193 *
3194 * The caller should call synchronize_rcu() after this.
3195 */
3196void ring_buffer_record_disable(struct ring_buffer *buffer)
3197{
3198        atomic_inc(&buffer->record_disabled);
3199}
3200EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3201
3202/**
3203 * ring_buffer_record_enable - enable writes to the buffer
3204 * @buffer: The ring buffer to enable writes
3205 *
3206 * Note, multiple disables will need the same number of enables
3207 * to truly enable the writing (much like preempt_disable).
3208 */
3209void ring_buffer_record_enable(struct ring_buffer *buffer)
3210{
3211        atomic_dec(&buffer->record_disabled);
3212}
3213EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3214
3215/**
3216 * ring_buffer_record_off - stop all writes into the buffer
3217 * @buffer: The ring buffer to stop writes to.
3218 *
3219 * This prevents all writes to the buffer. Any attempt to write
3220 * to the buffer after this will fail and return NULL.
3221 *
3222 * This is different than ring_buffer_record_disable() as
3223 * it works like an on/off switch, where as the disable() version
3224 * must be paired with a enable().
3225 */
3226void ring_buffer_record_off(struct ring_buffer *buffer)
3227{
3228        unsigned int rd;
3229        unsigned int new_rd;
3230
3231        do {
3232                rd = atomic_read(&buffer->record_disabled);
3233                new_rd = rd | RB_BUFFER_OFF;
3234        } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3235}
3236EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3237
3238/**
3239 * ring_buffer_record_on - restart writes into the buffer
3240 * @buffer: The ring buffer to start writes to.
3241 *
3242 * This enables all writes to the buffer that was disabled by
3243 * ring_buffer_record_off().
3244 *
3245 * This is different than ring_buffer_record_enable() as
3246 * it works like an on/off switch, where as the enable() version
3247 * must be paired with a disable().
3248 */
3249void ring_buffer_record_on(struct ring_buffer *buffer)
3250{
3251        unsigned int rd;
3252        unsigned int new_rd;
3253
3254        do {
3255                rd = atomic_read(&buffer->record_disabled);
3256                new_rd = rd & ~RB_BUFFER_OFF;
3257        } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3258}
3259EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3260
3261/**
3262 * ring_buffer_record_is_on - return true if the ring buffer can write
3263 * @buffer: The ring buffer to see if write is enabled
3264 *
3265 * Returns true if the ring buffer is in a state that it accepts writes.
3266 */
3267bool ring_buffer_record_is_on(struct ring_buffer *buffer)
3268{
3269        return !atomic_read(&buffer->record_disabled);
3270}
3271
3272/**
3273 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
3274 * @buffer: The ring buffer to see if write is set enabled
3275 *
3276 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
3277 * Note that this does NOT mean it is in a writable state.
3278 *
3279 * It may return true when the ring buffer has been disabled by
3280 * ring_buffer_record_disable(), as that is a temporary disabling of
3281 * the ring buffer.
3282 */
3283bool ring_buffer_record_is_set_on(struct ring_buffer *buffer)
3284{
3285        return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
3286}
3287
3288/**
3289 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3290 * @buffer: The ring buffer to stop writes to.
3291 * @cpu: The CPU buffer to stop
3292 *
3293 * This prevents all writes to the buffer. Any attempt to write
3294 * to the buffer after this will fail and return NULL.
3295 *
3296 * The caller should call synchronize_rcu() after this.
3297 */
3298void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
3299{
3300        struct ring_buffer_per_cpu *cpu_buffer;
3301
3302        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3303                return;
3304
3305        cpu_buffer = buffer->buffers[cpu];
3306        atomic_inc(&cpu_buffer->record_disabled);
3307}
3308EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3309
3310/**
3311 * ring_buffer_record_enable_cpu - enable writes to the buffer
3312 * @buffer: The ring buffer to enable writes
3313 * @cpu: The CPU to enable.
3314 *
3315 * Note, multiple disables will need the same number of enables
3316 * to truly enable the writing (much like preempt_disable).
3317 */
3318void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
3319{
3320        struct ring_buffer_per_cpu *cpu_buffer;
3321
3322        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3323                return;
3324
3325        cpu_buffer = buffer->buffers[cpu];
3326        atomic_dec(&cpu_buffer->record_disabled);
3327}
3328EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3329
3330/*
3331 * The total entries in the ring buffer is the running counter
3332 * of entries entered into the ring buffer, minus the sum of
3333 * the entries read from the ring buffer and the number of
3334 * entries that were overwritten.
3335 */
3336static inline unsigned long
3337rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3338{
3339        return local_read(&cpu_buffer->entries) -
3340                (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3341}
3342
3343/**
3344 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3345 * @buffer: The ring buffer
3346 * @cpu: The per CPU buffer to read from.
3347 */
3348u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
3349{
3350        unsigned long flags;
3351        struct ring_buffer_per_cpu *cpu_buffer;
3352        struct buffer_page *bpage;
3353        u64 ret = 0;
3354
3355        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3356                return 0;
3357
3358        cpu_buffer = buffer->buffers[cpu];
3359        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3360        /*
3361         * if the tail is on reader_page, oldest time stamp is on the reader
3362         * page
3363         */
3364        if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3365                bpage = cpu_buffer->reader_page;
3366        else
3367                bpage = rb_set_head_page(cpu_buffer);
3368        if (bpage)
3369                ret = bpage->page->time_stamp;
3370        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3371
3372        return ret;
3373}
3374EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3375
3376/**
3377 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
3378 * @buffer: The ring buffer
3379 * @cpu: The per CPU buffer to read from.
3380 */
3381unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
3382{
3383        struct ring_buffer_per_cpu *cpu_buffer;
3384        unsigned long ret;
3385
3386        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3387                return 0;
3388
3389        cpu_buffer = buffer->buffers[cpu];
3390        ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3391
3392        return ret;
3393}
3394EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3395
3396/**
3397 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3398 * @buffer: The ring buffer
3399 * @cpu: The per CPU buffer to get the entries from.
3400 */
3401unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
3402{
3403        struct ring_buffer_per_cpu *cpu_buffer;
3404
3405        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3406                return 0;
3407
3408        cpu_buffer = buffer->buffers[cpu];
3409
3410        return rb_num_of_entries(cpu_buffer);
3411}
3412EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3413
3414/**
3415 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3416 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3417 * @buffer: The ring buffer
3418 * @cpu: The per CPU buffer to get the number of overruns from
3419 */
3420unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3421{
3422        struct ring_buffer_per_cpu *cpu_buffer;
3423        unsigned long ret;
3424
3425        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3426                return 0;
3427
3428        cpu_buffer = buffer->buffers[cpu];
3429        ret = local_read(&cpu_buffer->overrun);
3430
3431        return ret;
3432}
3433EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
3434
3435/**
3436 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
3437 * commits failing due to the buffer wrapping around while there are uncommitted
3438 * events, such as during an interrupt storm.
3439 * @buffer: The ring buffer
3440 * @cpu: The per CPU buffer to get the number of overruns from
3441 */
3442unsigned long
3443ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
3444{
3445        struct ring_buffer_per_cpu *cpu_buffer;
3446        unsigned long ret;
3447
3448        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3449                return 0;
3450
3451        cpu_buffer = buffer->buffers[cpu];
3452        ret = local_read(&cpu_buffer->commit_overrun);
3453
3454        return ret;
3455}
3456EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
3457
3458/**
3459 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
3460 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
3461 * @buffer: The ring buffer
3462 * @cpu: The per CPU buffer to get the number of overruns from
3463 */
3464unsigned long
3465ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu)
3466{
3467        struct ring_buffer_per_cpu *cpu_buffer;
3468        unsigned long ret;
3469
3470        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3471                return 0;
3472
3473        cpu_buffer = buffer->buffers[cpu];
3474        ret = local_read(&cpu_buffer->dropped_events);
3475
3476        return ret;
3477}
3478EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
3479
3480/**
3481 * ring_buffer_read_events_cpu - get the number of events successfully read
3482 * @buffer: The ring buffer
3483 * @cpu: The per CPU buffer to get the number of events read
3484 */
3485unsigned long
3486ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
3487{
3488        struct ring_buffer_per_cpu *cpu_buffer;
3489
3490        if (!cpumask_test_cpu(cpu, buffer->cpumask))
3491                return 0;
3492
3493        cpu_buffer = buffer->buffers[cpu];
3494        return cpu_buffer->read;
3495}
3496EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
3497
3498/**
3499 * ring_buffer_entries - get the number of entries in a buffer
3500 * @buffer: The ring buffer
3501 *
3502 * Returns the total number of entries in the ring buffer
3503 * (all CPU entries)
3504 */
3505unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3506{
3507        struct ring_buffer_per_cpu *cpu_buffer;
3508        unsigned long entries = 0;
3509        int cpu;
3510
3511        /* if you care about this being correct, lock the buffer */
3512        for_each_buffer_cpu(buffer, cpu) {
3513                cpu_buffer = buffer->buffers[cpu];
3514                entries += rb_num_of_entries(cpu_buffer);
3515        }
3516
3517        return entries;
3518}
3519EXPORT_SYMBOL_GPL(ring_buffer_entries);
3520
3521/**
3522 * ring_buffer_overruns - get the number of overruns in buffer
3523 * @buffer: The ring buffer
3524 *
3525 * Returns the total number of overruns in the ring buffer
3526 * (all CPU entries)
3527 */
3528unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3529{
3530        struct ring_buffer_per_cpu *cpu_buffer;
3531        unsigned long overruns = 0;
3532        int cpu;
3533
3534        /* if you care about this being correct, lock the buffer */
3535        for_each_buffer_cpu(buffer, cpu) {
3536                cpu_buffer = buffer->buffers[cpu];
3537                overruns += local_read(&cpu_buffer->overrun);
3538        }
3539
3540        return overruns;
3541}
3542EXPORT_SYMBOL_GPL(ring_buffer_overruns);
3543
3544static void rb_iter_reset(struct ring_buffer_iter *iter)
3545{
3546        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3547
3548        /* Iterator usage is expected to have record disabled */
3549        iter->head_page = cpu_buffer->reader_page;
3550        iter->head = cpu_buffer->reader_page->read;
3551
3552        iter->cache_reader_page = iter->head_page;
3553        iter->cache_read = cpu_buffer->read;
3554
3555        if (iter->head)
3556                iter->read_stamp = cpu_buffer->read_stamp;
3557        else
3558                iter->read_stamp = iter->head_page->page->time_stamp;
3559}
3560
3561/**
3562 * ring_buffer_iter_reset - reset an iterator
3563 * @iter: The iterator to reset
3564 *
3565 * Resets the iterator, so that it will start from the beginning
3566 * again.
3567 */
3568void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
3569{
3570        struct ring_buffer_per_cpu *cpu_buffer;
3571        unsigned long flags;
3572
3573        if (!iter)
3574                return;
3575
3576        cpu_buffer = iter->cpu_buffer;
3577
3578        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3579        rb_iter_reset(iter);
3580        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3581}
3582EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
3583
3584/**
3585 * ring_buffer_iter_empty - check if an iterator has no more to read
3586 * @iter: The iterator to check
3587 */
3588int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
3589{
3590        struct ring_buffer_per_cpu *cpu_buffer;
3591        struct buffer_page *reader;
3592        struct buffer_page *head_page;
3593        struct buffer_page *commit_page;
3594        unsigned commit;
3595
3596        cpu_buffer = iter->cpu_buffer;
3597
3598        /* Remember, trace recording is off when iterator is in use */
3599        reader = cpu_buffer->reader_page;
3600        head_page = cpu_buffer->head_page;
3601        commit_page = cpu_buffer->commit_page;
3602        commit = rb_page_commit(commit_page);
3603
3604        return ((iter->head_page == commit_page && iter->head == commit) ||
3605                (iter->head_page == reader && commit_page == head_page &&
3606                 head_page->read == commit &&
3607                 iter->head == rb_page_commit(cpu_buffer->reader_page)));
3608}
3609EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
3610
3611static void
3612rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3613                     struct ring_buffer_event *event)
3614{
3615        u64 delta;
3616
3617        switch (event->type_len) {
3618        case RINGBUF_TYPE_PADDING:
3619                return;
3620
3621        case RINGBUF_TYPE_TIME_EXTEND:
3622                delta = ring_buffer_event_time_stamp(event);
3623                cpu_buffer->read_stamp += delta;
3624                return;
3625
3626        case RINGBUF_TYPE_TIME_STAMP:
3627                delta = ring_buffer_event_time_stamp(event);
3628                cpu_buffer->read_stamp = delta;
3629                return;
3630
3631        case RINGBUF_TYPE_DATA:
3632                cpu_buffer->read_stamp += event->time_delta;
3633                return;
3634
3635        default:
3636                BUG();
3637        }
3638        return;
3639}
3640
3641static void
3642rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3643                          struct ring_buffer_event *event)
3644{
3645        u64 delta;
3646
3647        switch (event->type_len) {
3648        case RINGBUF_TYPE_PADDING:
3649                return;
3650
3651        case RINGBUF_TYPE_TIME_EXTEND:
3652                delta = ring_buffer_event_time_stamp(event);
3653                iter->read_stamp += delta;
3654                return;
3655
3656        case RINGBUF_TYPE_TIME_STAMP:
3657                delta = ring_buffer_event_time_stamp(event);
3658                iter->read_stamp = delta;
3659                return;
3660
3661        case RINGBUF_TYPE_DATA:
3662                iter->read_stamp += event->time_delta;
3663                return;
3664
3665        default:
3666                BUG();
3667        }
3668        return;
3669}
3670
3671static struct buffer_page *
3672rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3673{
3674        struct buffer_page *reader = NULL;
3675        unsigned long overwrite;
3676        unsigned long flags;
3677        int nr_loops = 0;
3678        int ret;
3679
3680        local_irq_save(flags);
3681        arch_spin_lock(&cpu_buffer->lock);
3682
3683 again:
3684        /*
3685         * This should normally only loop twice. But because the
3686         * start of the reader inserts an empty page, it causes
3687         * a case where we will loop three times. There should be no
3688         * reason to loop four times (that I know of).
3689         */
3690        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3691                reader = NULL;
3692                goto out;
3693        }
3694
3695        reader = cpu_buffer->reader_page;
3696
3697        /* If there's more to read, return this page */
3698        if (cpu_buffer->reader_page->read < rb_page_size(reader))
3699                goto out;
3700
3701        /* Never should we have an index greater than the size */
3702        if (RB_WARN_ON(cpu_buffer,
3703                       cpu_buffer->reader_page->read > rb_page_size(reader)))
3704                goto out;
3705
3706        /* check if we caught up to the tail */
3707        reader = NULL;
3708        if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3709                goto out;
3710
3711        /* Don't bother swapping if the ring buffer is empty */
3712        if (rb_num_of_entries(cpu_buffer) == 0)
3713                goto out;
3714
3715        /*
3716         * Reset the reader page to size zero.
3717         */
3718        local_set(&cpu_buffer->reader_page->write, 0);
3719        local_set(&cpu_buffer->reader_page->entries, 0);
3720        local_set(&cpu_buffer->reader_page->page->commit, 0);
3721        cpu_buffer->reader_page->real_end = 0;
3722
3723 spin:
3724        /*
3725         * Splice the empty reader page into the list around the head.
3726         */
3727        reader = rb_set_head_page(cpu_buffer);
3728        if (!reader)
3729                goto out;
3730        cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3731        cpu_buffer->reader_page->list.prev = reader->list.prev;
3732
3733        /*
3734         * cpu_buffer->pages just needs to point to the buffer, it
3735         *  has no specific buffer page to point to. Lets move it out
3736         *  of our way so we don't accidentally swap it.
3737         */
3738        cpu_buffer->pages = reader->list.prev;
3739
3740        /* The reader page will be pointing to the new head */
3741        rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3742
3743        /*
3744         * We want to make sure we read the overruns after we set up our
3745         * pointers to the next object. The writer side does a
3746         * cmpxchg to cross pages which acts as the mb on the writer
3747         * side. Note, the reader will constantly fail the swap
3748         * while the writer is updating the pointers, so this
3749         * guarantees that the overwrite recorded here is the one we
3750         * want to compare with the last_overrun.
3751         */
3752        smp_mb();
3753        overwrite = local_read(&(cpu_buffer->overrun));
3754
3755        /*
3756         * Here's the tricky part.
3757         *
3758         * We need to move the pointer past the header page.
3759         * But we can only do that if a writer is not currently
3760         * moving it. The page before the header page has the
3761         * flag bit '1' set if it is pointing to the page we want.
3762         * but if the writer is in the process of moving it
3763         * than it will be '2' or already moved '0'.
3764         */
3765
3766        ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3767
3768        /*
3769         * If we did not convert it, then we must try again.
3770         */
3771        if (!ret)
3772                goto spin;
3773
3774        /*
3775         * Yay! We succeeded in replacing the page.
3776         *
3777         * Now make the new head point back to the reader page.
3778         */
3779        rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3780        rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3781
3782        local_inc(&cpu_buffer->pages_read);
3783
3784        /* Finally update the reader page to the new head */
3785        cpu_buffer->reader_page = reader;
3786        cpu_buffer->reader_page->read = 0;
3787
3788        if (overwrite != cpu_buffer->last_overrun) {
3789                cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3790                cpu_buffer->last_overrun = overwrite;
3791        }
3792
3793        goto again;
3794
3795 out:
3796        /* Update the read_stamp on the first event */
3797        if (reader && reader->read == 0)
3798                cpu_buffer->read_stamp = reader->page->time_stamp;
3799
3800        arch_spin_unlock(&cpu_buffer->lock);
3801        local_irq_restore(flags);
3802
3803        return reader;
3804}
3805
3806static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3807{
3808        struct ring_buffer_event *event;
3809        struct buffer_page *reader;
3810        unsigned length;
3811
3812        reader = rb_get_reader_page(cpu_buffer);
3813
3814        /* This function should not be called when buffer is empty */
3815        if (RB_WARN_ON(cpu_buffer, !reader))
3816                return;
3817
3818        event = rb_reader_event(cpu_buffer);
3819
3820        if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
3821                cpu_buffer->read++;
3822
3823        rb_update_read_stamp(cpu_buffer, event);
3824
3825        length = rb_event_length(event);
3826        cpu_buffer->reader_page->read += length;
3827}
3828
3829static void rb_advance_iter(struct ring_buffer_iter *iter)
3830{
3831        struct ring_buffer_per_cpu *cpu_buffer;
3832        struct ring_buffer_event *event;
3833        unsigned length;
3834
3835        cpu_buffer = iter->cpu_buffer;
3836
3837        /*
3838         * Check if we are at the end of the buffer.
3839         */
3840        if (iter->head >= rb_page_size(iter->head_page)) {
3841                /* discarded commits can make the page empty */
3842                if (iter->head_page == cpu_buffer->commit_page)
3843                        return;
3844                rb_inc_iter(iter);
3845                return;
3846        }
3847
3848        event = rb_iter_head_event(iter);
3849
3850        length = rb_event_length(event);
3851
3852        /*
3853         * This should not be called to advance the header if we are
3854         * at the tail of the buffer.
3855         */
3856        if (RB_WARN_ON(cpu_buffer,
3857                       (iter->head_page == cpu_buffer->commit_page) &&
3858                       (iter->head + length > rb_commit_index(cpu_buffer))))
3859                return;
3860
3861        rb_update_iter_read_stamp(iter, event);
3862
3863        iter->head += length;
3864
3865        /* check for end of page padding */
3866        if ((iter->head >= rb_page_size(iter->head_page)) &&
3867            (iter->head_page != cpu_buffer->commit_page))
3868                rb_inc_iter(iter);
3869}
3870
3871static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3872{
3873        return cpu_buffer->lost_events;
3874}
3875
3876static struct ring_buffer_event *
3877rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3878               unsigned long *lost_events)
3879{
3880        struct ring_buffer_event *event;
3881        struct buffer_page *reader;
3882        int nr_loops = 0;
3883
3884        if (ts)
3885                *ts = 0;
3886 again:
3887        /*
3888         * We repeat when a time extend is encountered.
3889         * Since the time extend is always attached to a data event,
3890         * we should never loop more than once.
3891         * (We never hit the following condition more than twice).
3892         */
3893        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3894                return NULL;
3895
3896        reader = rb_get_reader_page(cpu_buffer);
3897        if (!reader)
3898                return NULL;
3899
3900        event = rb_reader_event(cpu_buffer);
3901
3902        switch (event->type_len) {
3903        case RINGBUF_TYPE_PADDING:
3904                if (rb_null_event(event))
3905                        RB_WARN_ON(cpu_buffer, 1);
3906                /*
3907                 * Because the writer could be discarding every
3908                 * event it creates (which would probably be bad)
3909                 * if we were to go back to "again" then we may never
3910                 * catch up, and will trigger the warn on, or lock
3911                 * the box. Return the padding, and we will release
3912                 * the current locks, and try again.
3913                 */
3914                return event;
3915
3916        case RINGBUF_TYPE_TIME_EXTEND:
3917                /* Internal data, OK to advance */
3918                rb_advance_reader(cpu_buffer);
3919                goto again;
3920
3921        case RINGBUF_TYPE_TIME_STAMP:
3922                if (ts) {
3923                        *ts = ring_buffer_event_time_stamp(event);
3924                        ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
3925                                                         cpu_buffer->cpu, ts);
3926                }
3927                /* Internal data, OK to advance */
3928                rb_advance_reader(cpu_buffer);
3929                goto again;
3930
3931        case RINGBUF_TYPE_DATA:
3932                if (ts && !(*ts)) {
3933                        *ts = cpu_buffer->read_stamp + event->time_delta;
3934                        ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
3935                                                         cpu_buffer->cpu, ts);
3936                }
3937                if (lost_events)
3938                        *lost_events = rb_lost_events(cpu_buffer);
3939                return event;
3940
3941        default:
3942                BUG();
3943        }
3944
3945        return NULL;
3946}
3947EXPORT_SYMBOL_GPL(ring_buffer_peek);
3948
3949static struct ring_buffer_event *
3950rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3951{
3952        struct ring_buffer *buffer;
3953        struct ring_buffer_per_cpu *cpu_buffer;
3954        struct ring_buffer_event *event;
3955        int nr_loops = 0;
3956
3957        if (ts)
3958                *ts = 0;
3959
3960        cpu_buffer = iter->cpu_buffer;
3961        buffer = cpu_buffer->buffer;
3962
3963        /*
3964         * Check if someone performed a consuming read to
3965         * the buffer. A consuming read invalidates the iterator
3966         * and we need to reset the iterator in this case.
3967         */
3968        if (unlikely(iter->cache_read != cpu_buffer->read ||
3969                     iter->cache_reader_page != cpu_buffer->reader_page))
3970                rb_iter_reset(iter);
3971
3972 again:
3973        if (ring_buffer_iter_empty(iter))
3974                return NULL;
3975
3976        /*
3977         * We repeat when a time extend is encountered or we hit
3978         * the end of the page. Since the time extend is always attached
3979         * to a data event, we should never loop more than three times.
3980         * Once for going to next page, once on time extend, and
3981         * finally once to get the event.
3982         * (We never hit the following condition more than thrice).
3983         */
3984        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3))
3985                return NULL;
3986
3987        if (rb_per_cpu_empty(cpu_buffer))
3988                return NULL;
3989
3990        if (iter->head >= rb_page_size(iter->head_page)) {
3991                rb_inc_iter(iter);
3992                goto again;
3993        }
3994
3995        event = rb_iter_head_event(iter);
3996
3997        switch (event->type_len) {
3998        case RINGBUF_TYPE_PADDING:
3999                if (rb_null_event(event)) {
4000                        rb_inc_iter(iter);
4001                        goto again;
4002                }
4003                rb_advance_iter(iter);
4004                return event;
4005
4006        case RINGBUF_TYPE_TIME_EXTEND:
4007                /* Internal data, OK to advance */
4008                rb_advance_iter(iter);
4009                goto again;
4010
4011        case RINGBUF_TYPE_TIME_STAMP:
4012                if (ts) {
4013                        *ts = ring_buffer_event_time_stamp(event);
4014                        ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4015                                                         cpu_buffer->cpu, ts);
4016                }
4017                /* Internal data, OK to advance */
4018                rb_advance_iter(iter);
4019                goto again;
4020
4021        case RINGBUF_TYPE_DATA:
4022                if (ts && !(*ts)) {
4023                        *ts = iter->read_stamp + event->time_delta;
4024                        ring_buffer_normalize_time_stamp(buffer,
4025                                                         cpu_buffer->cpu, ts);
4026                }
4027                return event;
4028
4029        default:
4030                BUG();
4031        }
4032
4033        return NULL;
4034}
4035EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4036
4037static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4038{
4039        if (likely(!in_nmi())) {
4040                raw_spin_lock(&cpu_buffer->reader_lock);
4041                return true;
4042        }
4043
4044        /*
4045         * If an NMI die dumps out the content of the ring buffer
4046         * trylock must be used to prevent a deadlock if the NMI
4047         * preempted a task that holds the ring buffer locks. If
4048         * we get the lock then all is fine, if not, then continue
4049         * to do the read, but this can corrupt the ring buffer,
4050         * so it must be permanently disabled from future writes.
4051         * Reading from NMI is a oneshot deal.
4052         */
4053        if (raw_spin_trylock(&cpu_buffer->reader_lock))
4054                return true;
4055
4056        /* Continue without locking, but disable the ring buffer */
4057        atomic_inc(&cpu_buffer->record_disabled);
4058        return false;
4059}
4060
4061static inline void
4062rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4063{
4064        if (likely(locked))
4065                raw_spin_unlock(&cpu_buffer->reader_lock);
4066        return;
4067}
4068
4069/**
4070 * ring_buffer_peek - peek at the next event to be read
4071 * @buffer: The ring buffer to read
4072 * @cpu: The cpu to peak at
4073 * @ts: The timestamp counter of this event.
4074 * @lost_events: a variable to store if events were lost (may be NULL)
4075 *
4076 * This will return the event that will be read next, but does
4077 * not consume the data.
4078 */
4079struct ring_buffer_event *
4080ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
4081                 unsigned long *lost_events)
4082{
4083        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4084        struct ring_buffer_event *event;
4085        unsigned long flags;
4086        bool dolock;
4087
4088        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4089                return NULL;
4090
4091 again:
4092        local_irq_save(flags);
4093        dolock = rb_reader_lock(cpu_buffer);
4094        event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4095        if (event && event->type_len == RINGBUF_TYPE_PADDING)
4096                rb_advance_reader(cpu_buffer);
4097        rb_reader_unlock(cpu_buffer, dolock);
4098        local_irq_restore(flags);
4099
4100        if (event && event->type_len == RINGBUF_TYPE_PADDING)
4101                goto again;
4102
4103        return event;
4104}
4105
4106/**
4107 * ring_buffer_iter_peek - peek at the next event to be read
4108 * @iter: The ring buffer iterator
4109 * @ts: The timestamp counter of this event.
4110 *
4111 * This will return the event that will be read next, but does
4112 * not increment the iterator.
4113 */
4114struct ring_buffer_event *
4115ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4116{
4117        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4118        struct ring_buffer_event *event;
4119        unsigned long flags;
4120
4121 again:
4122        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4123        event = rb_iter_peek(iter, ts);
4124        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4125
4126        if (event && event->type_len == RINGBUF_TYPE_PADDING)
4127                goto again;
4128
4129        return event;
4130}
4131
4132/**
4133 * ring_buffer_consume - return an event and consume it
4134 * @buffer: The ring buffer to get the next event from
4135 * @cpu: the cpu to read the buffer from
4136 * @ts: a variable to store the timestamp (may be NULL)
4137 * @lost_events: a variable to store if events were lost (may be NULL)
4138 *
4139 * Returns the next event in the ring buffer, and that event is consumed.
4140 * Meaning, that sequential reads will keep returning a different event,
4141 * and eventually empty the ring buffer if the producer is slower.
4142 */
4143struct ring_buffer_event *
4144ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
4145                    unsigned long *lost_events)
4146{
4147        struct ring_buffer_per_cpu *cpu_buffer;
4148        struct ring_buffer_event *event = NULL;
4149        unsigned long flags;
4150        bool dolock;
4151
4152 again:
4153        /* might be called in atomic */
4154        preempt_disable();
4155
4156        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4157                goto out;
4158
4159        cpu_buffer = buffer->buffers[cpu];
4160        local_irq_save(flags);
4161        dolock = rb_reader_lock(cpu_buffer);
4162
4163        event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4164        if (event) {
4165                cpu_buffer->lost_events = 0;
4166                rb_advance_reader(cpu_buffer);
4167        }
4168
4169        rb_reader_unlock(cpu_buffer, dolock);
4170        local_irq_restore(flags);
4171
4172 out:
4173        preempt_enable();
4174
4175        if (event && event->type_len == RINGBUF_TYPE_PADDING)
4176                goto again;
4177
4178        return event;
4179}
4180EXPORT_SYMBOL_GPL(ring_buffer_consume);
4181
4182/**
4183 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4184 * @buffer: The ring buffer to read from
4185 * @cpu: The cpu buffer to iterate over
4186 * @flags: gfp flags to use for memory allocation
4187 *
4188 * This performs the initial preparations necessary to iterate
4189 * through the buffer.  Memory is allocated, buffer recording
4190 * is disabled, and the iterator pointer is returned to the caller.
4191 *
4192 * Disabling buffer recording prevents the reading from being
4193 * corrupted. This is not a consuming read, so a producer is not
4194 * expected.
4195 *
4196 * After a sequence of ring_buffer_read_prepare calls, the user is
4197 * expected to make at least one call to ring_buffer_read_prepare_sync.
4198 * Afterwards, ring_buffer_read_start is invoked to get things going
4199 * for real.
4200 *
4201 * This overall must be paired with ring_buffer_read_finish.
4202 */
4203struct ring_buffer_iter *
4204ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu, gfp_t flags)
4205{
4206        struct ring_buffer_per_cpu *cpu_buffer;
4207        struct ring_buffer_iter *iter;
4208
4209        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4210                return NULL;
4211
4212        iter = kmalloc(sizeof(*iter), flags);
4213        if (!iter)
4214                return NULL;
4215
4216        cpu_buffer = buffer->buffers[cpu];
4217
4218        iter->cpu_buffer = cpu_buffer;
4219
4220        atomic_inc(&buffer->resize_disabled);
4221        atomic_inc(&cpu_buffer->record_disabled);
4222
4223        return iter;
4224}
4225EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4226
4227/**
4228 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4229 *
4230 * All previously invoked ring_buffer_read_prepare calls to prepare
4231 * iterators will be synchronized.  Afterwards, read_buffer_read_start
4232 * calls on those iterators are allowed.
4233 */
4234void
4235ring_buffer_read_prepare_sync(void)
4236{
4237        synchronize_rcu();
4238}
4239EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4240
4241/**
4242 * ring_buffer_read_start - start a non consuming read of the buffer
4243 * @iter: The iterator returned by ring_buffer_read_prepare
4244 *
4245 * This finalizes the startup of an iteration through the buffer.
4246 * The iterator comes from a call to ring_buffer_read_prepare and
4247 * an intervening ring_buffer_read_prepare_sync must have been
4248 * performed.
4249 *
4250 * Must be paired with ring_buffer_read_finish.
4251 */
4252void
4253ring_buffer_read_start(struct ring_buffer_iter *iter)
4254{
4255        struct ring_buffer_per_cpu *cpu_buffer;
4256        unsigned long flags;
4257
4258        if (!iter)
4259                return;
4260
4261        cpu_buffer = iter->cpu_buffer;
4262
4263        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4264        arch_spin_lock(&cpu_buffer->lock);
4265        rb_iter_reset(iter);
4266        arch_spin_unlock(&cpu_buffer->lock);
4267        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4268}
4269EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4270
4271/**
4272 * ring_buffer_read_finish - finish reading the iterator of the buffer
4273 * @iter: The iterator retrieved by ring_buffer_start
4274 *
4275 * This re-enables the recording to the buffer, and frees the
4276 * iterator.
4277 */
4278void
4279ring_buffer_read_finish(struct ring_buffer_iter *iter)
4280{
4281        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4282        unsigned long flags;
4283
4284        /*
4285         * Ring buffer is disabled from recording, here's a good place
4286         * to check the integrity of the ring buffer.
4287         * Must prevent readers from trying to read, as the check
4288         * clears the HEAD page and readers require it.
4289         */
4290        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4291        rb_check_pages(cpu_buffer);
4292        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4293
4294        atomic_dec(&cpu_buffer->record_disabled);
4295        atomic_dec(&cpu_buffer->buffer->resize_disabled);
4296        kfree(iter);
4297}
4298EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4299
4300/**
4301 * ring_buffer_read - read the next item in the ring buffer by the iterator
4302 * @iter: The ring buffer iterator
4303 * @ts: The time stamp of the event read.
4304 *
4305 * This reads the next event in the ring buffer and increments the iterator.
4306 */
4307struct ring_buffer_event *
4308ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
4309{
4310        struct ring_buffer_event *event;
4311        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4312        unsigned long flags;
4313
4314        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4315 again:
4316        event = rb_iter_peek(iter, ts);
4317        if (!event)
4318                goto out;
4319
4320        if (event->type_len == RINGBUF_TYPE_PADDING)
4321                goto again;
4322
4323        rb_advance_iter(iter);
4324 out:
4325        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4326
4327        return event;
4328}
4329EXPORT_SYMBOL_GPL(ring_buffer_read);
4330
4331/**
4332 * ring_buffer_size - return the size of the ring buffer (in bytes)
4333 * @buffer: The ring buffer.
4334 */
4335unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
4336{
4337        /*
4338         * Earlier, this method returned
4339         *      BUF_PAGE_SIZE * buffer->nr_pages
4340         * Since the nr_pages field is now removed, we have converted this to
4341         * return the per cpu buffer value.
4342         */
4343        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4344                return 0;
4345
4346        return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4347}
4348EXPORT_SYMBOL_GPL(ring_buffer_size);
4349
4350static void
4351rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4352{
4353        rb_head_page_deactivate(cpu_buffer);
4354
4355        cpu_buffer->head_page
4356                = list_entry(cpu_buffer->pages, struct buffer_page, list);
4357        local_set(&cpu_buffer->head_page->write, 0);
4358        local_set(&cpu_buffer->head_page->entries, 0);
4359        local_set(&cpu_buffer->head_page->page->commit, 0);
4360
4361        cpu_buffer->head_page->read = 0;
4362
4363        cpu_buffer->tail_page = cpu_buffer->head_page;
4364        cpu_buffer->commit_page = cpu_buffer->head_page;
4365
4366        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
4367        INIT_LIST_HEAD(&cpu_buffer->new_pages);
4368        local_set(&cpu_buffer->reader_page->write, 0);
4369        local_set(&cpu_buffer->reader_page->entries, 0);
4370        local_set(&cpu_buffer->reader_page->page->commit, 0);
4371        cpu_buffer->reader_page->read = 0;
4372
4373        local_set(&cpu_buffer->entries_bytes, 0);
4374        local_set(&cpu_buffer->overrun, 0);
4375        local_set(&cpu_buffer->commit_overrun, 0);
4376        local_set(&cpu_buffer->dropped_events, 0);
4377        local_set(&cpu_buffer->entries, 0);
4378        local_set(&cpu_buffer->committing, 0);
4379        local_set(&cpu_buffer->commits, 0);
4380        local_set(&cpu_buffer->pages_touched, 0);
4381        local_set(&cpu_buffer->pages_read, 0);
4382        cpu_buffer->last_pages_touch = 0;
4383        cpu_buffer->shortest_full = 0;
4384        cpu_buffer->read = 0;
4385        cpu_buffer->read_bytes = 0;
4386
4387        cpu_buffer->write_stamp = 0;
4388        cpu_buffer->read_stamp = 0;
4389
4390        cpu_buffer->lost_events = 0;
4391        cpu_buffer->last_overrun = 0;
4392
4393        rb_head_page_activate(cpu_buffer);
4394}
4395
4396/**
4397 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
4398 * @buffer: The ring buffer to reset a per cpu buffer of
4399 * @cpu: The CPU buffer to be reset
4400 */
4401void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
4402{
4403        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4404        unsigned long flags;
4405
4406        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4407                return;
4408
4409        atomic_inc(&buffer->resize_disabled);
4410        atomic_inc(&cpu_buffer->record_disabled);
4411
4412        /* Make sure all commits have finished */
4413        synchronize_rcu();
4414
4415        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4416
4417        if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
4418                goto out;
4419
4420        arch_spin_lock(&cpu_buffer->lock);
4421
4422        rb_reset_cpu(cpu_buffer);
4423
4424        arch_spin_unlock(&cpu_buffer->lock);
4425
4426 out:
4427        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4428
4429        atomic_dec(&cpu_buffer->record_disabled);
4430        atomic_dec(&buffer->resize_disabled);
4431}
4432EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
4433
4434/**
4435 * ring_buffer_reset - reset a ring buffer
4436 * @buffer: The ring buffer to reset all cpu buffers
4437 */
4438void ring_buffer_reset(struct ring_buffer *buffer)
4439{
4440        int cpu;
4441
4442        for_each_buffer_cpu(buffer, cpu)
4443                ring_buffer_reset_cpu(buffer, cpu);
4444}
4445EXPORT_SYMBOL_GPL(ring_buffer_reset);
4446
4447/**
4448 * rind_buffer_empty - is the ring buffer empty?
4449 * @buffer: The ring buffer to test
4450 */
4451bool ring_buffer_empty(struct ring_buffer *buffer)
4452{
4453        struct ring_buffer_per_cpu *cpu_buffer;
4454        unsigned long flags;
4455        bool dolock;
4456        int cpu;
4457        int ret;
4458
4459        /* yes this is racy, but if you don't like the race, lock the buffer */
4460        for_each_buffer_cpu(buffer, cpu) {
4461                cpu_buffer = buffer->buffers[cpu];
4462                local_irq_save(flags);
4463                dolock = rb_reader_lock(cpu_buffer);
4464                ret = rb_per_cpu_empty(cpu_buffer);
4465                rb_reader_unlock(cpu_buffer, dolock);
4466                local_irq_restore(flags);
4467
4468                if (!ret)
4469                        return false;
4470        }
4471
4472        return true;
4473}
4474EXPORT_SYMBOL_GPL(ring_buffer_empty);
4475
4476/**
4477 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
4478 * @buffer: The ring buffer
4479 * @cpu: The CPU buffer to test
4480 */
4481bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
4482{
4483        struct ring_buffer_per_cpu *cpu_buffer;
4484        unsigned long flags;
4485        bool dolock;
4486        int ret;
4487
4488        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4489                return true;
4490
4491        cpu_buffer = buffer->buffers[cpu];
4492        local_irq_save(flags);
4493        dolock = rb_reader_lock(cpu_buffer);
4494        ret = rb_per_cpu_empty(cpu_buffer);
4495        rb_reader_unlock(cpu_buffer, dolock);
4496        local_irq_restore(flags);
4497
4498        return ret;
4499}
4500EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
4501
4502#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4503/**
4504 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
4505 * @buffer_a: One buffer to swap with
4506 * @buffer_b: The other buffer to swap with
4507 *
4508 * This function is useful for tracers that want to take a "snapshot"
4509 * of a CPU buffer and has another back up buffer lying around.
4510 * it is expected that the tracer handles the cpu buffer not being
4511 * used at the moment.
4512 */
4513int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4514                         struct ring_buffer *buffer_b, int cpu)
4515{
4516        struct ring_buffer_per_cpu *cpu_buffer_a;
4517        struct ring_buffer_per_cpu *cpu_buffer_b;
4518        int ret = -EINVAL;
4519
4520        if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4521            !cpumask_test_cpu(cpu, buffer_b->cpumask))
4522                goto out;
4523
4524        cpu_buffer_a = buffer_a->buffers[cpu];
4525        cpu_buffer_b = buffer_b->buffers[cpu];
4526
4527        /* At least make sure the two buffers are somewhat the same */
4528        if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4529                goto out;
4530
4531        ret = -EAGAIN;
4532
4533        if (atomic_read(&buffer_a->record_disabled))
4534                goto out;
4535
4536        if (atomic_read(&buffer_b->record_disabled))
4537                goto out;
4538
4539        if (atomic_read(&cpu_buffer_a->record_disabled))
4540                goto out;
4541
4542        if (atomic_read(&cpu_buffer_b->record_disabled))
4543                goto out;
4544
4545        /*
4546         * We can't do a synchronize_rcu here because this
4547         * function can be called in atomic context.
4548         * Normally this will be called from the same CPU as cpu.
4549         * If not it's up to the caller to protect this.
4550         */
4551        atomic_inc(&cpu_buffer_a->record_disabled);
4552        atomic_inc(&cpu_buffer_b->record_disabled);
4553
4554        ret = -EBUSY;
4555        if (local_read(&cpu_buffer_a->committing))
4556                goto out_dec;
4557        if (local_read(&cpu_buffer_b->committing))
4558                goto out_dec;
4559
4560        buffer_a->buffers[cpu] = cpu_buffer_b;
4561        buffer_b->buffers[cpu] = cpu_buffer_a;
4562
4563        cpu_buffer_b->buffer = buffer_a;
4564        cpu_buffer_a->buffer = buffer_b;
4565
4566        ret = 0;
4567
4568out_dec:
4569        atomic_dec(&cpu_buffer_a->record_disabled);
4570        atomic_dec(&cpu_buffer_b->record_disabled);
4571out:
4572        return ret;
4573}
4574EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4575#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4576
4577/**
4578 * ring_buffer_alloc_read_page - allocate a page to read from buffer
4579 * @buffer: the buffer to allocate for.
4580 * @cpu: the cpu buffer to allocate.
4581 *
4582 * This function is used in conjunction with ring_buffer_read_page.
4583 * When reading a full page from the ring buffer, these functions
4584 * can be used to speed up the process. The calling function should
4585 * allocate a few pages first with this function. Then when it
4586 * needs to get pages from the ring buffer, it passes the result
4587 * of this function into ring_buffer_read_page, which will swap
4588 * the page that was allocated, with the read page of the buffer.
4589 *
4590 * Returns:
4591 *  The page allocated, or ERR_PTR
4592 */
4593void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4594{
4595        struct ring_buffer_per_cpu *cpu_buffer;
4596        struct buffer_data_page *bpage = NULL;
4597        unsigned long flags;
4598        struct page *page;
4599
4600        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4601                return ERR_PTR(-ENODEV);
4602
4603        cpu_buffer = buffer->buffers[cpu];
4604        local_irq_save(flags);
4605        arch_spin_lock(&cpu_buffer->lock);
4606
4607        if (cpu_buffer->free_page) {
4608                bpage = cpu_buffer->free_page;
4609                cpu_buffer->free_page = NULL;
4610        }
4611
4612        arch_spin_unlock(&cpu_buffer->lock);
4613        local_irq_restore(flags);
4614
4615        if (bpage)
4616                goto out;
4617
4618        page = alloc_pages_node(cpu_to_node(cpu),
4619                                GFP_KERNEL | __GFP_NORETRY, 0);
4620        if (!page)
4621                return ERR_PTR(-ENOMEM);
4622
4623        bpage = page_address(page);
4624
4625 out:
4626        rb_init_page(bpage);
4627
4628        return bpage;
4629}
4630EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
4631
4632/**
4633 * ring_buffer_free_read_page - free an allocated read page
4634 * @buffer: the buffer the page was allocate for
4635 * @cpu: the cpu buffer the page came from
4636 * @data: the page to free
4637 *
4638 * Free a page allocated from ring_buffer_alloc_read_page.
4639 */
4640void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data)
4641{
4642        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4643        struct buffer_data_page *bpage = data;
4644        struct page *page = virt_to_page(bpage);
4645        unsigned long flags;
4646
4647        /* If the page is still in use someplace else, we can't reuse it */
4648        if (page_ref_count(page) > 1)
4649                goto out;
4650
4651        local_irq_save(flags);
4652        arch_spin_lock(&cpu_buffer->lock);
4653
4654        if (!cpu_buffer->free_page) {
4655                cpu_buffer->free_page = bpage;
4656                bpage = NULL;
4657        }
4658
4659        arch_spin_unlock(&cpu_buffer->lock);
4660        local_irq_restore(flags);
4661
4662 out:
4663        free_page((unsigned long)bpage);
4664}
4665EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
4666
4667/**
4668 * ring_buffer_read_page - extract a page from the ring buffer
4669 * @buffer: buffer to extract from
4670 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
4671 * @len: amount to extract
4672 * @cpu: the cpu of the buffer to extract
4673 * @full: should the extraction only happen when the page is full.
4674 *
4675 * This function will pull out a page from the ring buffer and consume it.
4676 * @data_page must be the address of the variable that was returned
4677 * from ring_buffer_alloc_read_page. This is because the page might be used
4678 * to swap with a page in the ring buffer.
4679 *
4680 * for example:
4681 *      rpage = ring_buffer_alloc_read_page(buffer, cpu);
4682 *      if (IS_ERR(rpage))
4683 *              return PTR_ERR(rpage);
4684 *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
4685 *      if (ret >= 0)
4686 *              process_page(rpage, ret);
4687 *
4688 * When @full is set, the function will not return true unless
4689 * the writer is off the reader page.
4690 *
4691 * Note: it is up to the calling functions to handle sleeps and wakeups.
4692 *  The ring buffer can be used anywhere in the kernel and can not
4693 *  blindly call wake_up. The layer that uses the ring buffer must be
4694 *  responsible for that.
4695 *
4696 * Returns:
4697 *  >=0 if data has been transferred, returns the offset of consumed data.
4698 *  <0 if no data has been transferred.
4699 */
4700int ring_buffer_read_page(struct ring_buffer *buffer,
4701                          void **data_page, size_t len, int cpu, int full)
4702{
4703        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4704        struct ring_buffer_event *event;
4705        struct buffer_data_page *bpage;
4706        struct buffer_page *reader;
4707        unsigned long missed_events;
4708        unsigned long flags;
4709        unsigned int commit;
4710        unsigned int read;
4711        u64 save_timestamp;
4712        int ret = -1;
4713
4714        if (!cpumask_test_cpu(cpu, buffer->cpumask))
4715                goto out;
4716
4717        /*
4718         * If len is not big enough to hold the page header, then
4719         * we can not copy anything.
4720         */
4721        if (len <= BUF_PAGE_HDR_SIZE)
4722                goto out;
4723
4724        len -= BUF_PAGE_HDR_SIZE;
4725
4726        if (!data_page)
4727                goto out;
4728
4729        bpage = *data_page;
4730        if (!bpage)
4731                goto out;
4732
4733        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4734
4735        reader = rb_get_reader_page(cpu_buffer);
4736        if (!reader)
4737                goto out_unlock;
4738
4739        event = rb_reader_event(cpu_buffer);
4740
4741        read = reader->read;
4742        commit = rb_page_commit(reader);
4743
4744        /* Check if any events were dropped */
4745        missed_events = cpu_buffer->lost_events;
4746
4747        /*
4748         * If this page has been partially read or
4749         * if len is not big enough to read the rest of the page or
4750         * a writer is still on the page, then
4751         * we must copy the data from the page to the buffer.
4752         * Otherwise, we can simply swap the page with the one passed in.
4753         */
4754        if (read || (len < (commit - read)) ||
4755            cpu_buffer->reader_page == cpu_buffer->commit_page) {
4756                struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4757                unsigned int rpos = read;
4758                unsigned int pos = 0;
4759                unsigned int size;
4760
4761                if (full)
4762                        goto out_unlock;
4763
4764                if (len > (commit - read))
4765                        len = (commit - read);
4766
4767                /* Always keep the time extend and data together */
4768                size = rb_event_ts_length(event);
4769
4770                if (len < size)
4771                        goto out_unlock;
4772
4773                /* save the current timestamp, since the user will need it */
4774                save_timestamp = cpu_buffer->read_stamp;
4775
4776                /* Need to copy one event at a time */
4777                do {
4778                        /* We need the size of one event, because
4779                         * rb_advance_reader only advances by one event,
4780                         * whereas rb_event_ts_length may include the size of
4781                         * one or two events.
4782                         * We have already ensured there's enough space if this
4783                         * is a time extend. */
4784                        size = rb_event_length(event);
4785                        memcpy(bpage->data + pos, rpage->data + rpos, size);
4786
4787                        len -= size;
4788
4789                        rb_advance_reader(cpu_buffer);
4790                        rpos = reader->read;
4791                        pos += size;
4792
4793                        if (rpos >= commit)
4794                                break;
4795
4796                        event = rb_reader_event(cpu_buffer);
4797                        /* Always keep the time extend and data together */
4798                        size = rb_event_ts_length(event);
4799                } while (len >= size);
4800
4801                /* update bpage */
4802                local_set(&bpage->commit, pos);
4803                bpage->time_stamp = save_timestamp;
4804
4805                /* we copied everything to the beginning */
4806                read = 0;
4807        } else {
4808                /* update the entry counter */
4809                cpu_buffer->read += rb_page_entries(reader);
4810                cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4811
4812                /* swap the pages */
4813                rb_init_page(bpage);
4814                bpage = reader->page;
4815                reader->page = *data_page;
4816                local_set(&reader->write, 0);
4817                local_set(&reader->entries, 0);
4818                reader->read = 0;
4819                *data_page = bpage;
4820
4821                /*
4822                 * Use the real_end for the data size,
4823                 * This gives us a chance to store the lost events
4824                 * on the page.
4825                 */
4826                if (reader->real_end)
4827                        local_set(&bpage->commit, reader->real_end);
4828        }
4829        ret = read;
4830
4831        cpu_buffer->lost_events = 0;
4832
4833        commit = local_read(&bpage->commit);
4834        /*
4835         * Set a flag in the commit field if we lost events
4836         */
4837        if (missed_events) {
4838                /* If there is room at the end of the page to save the
4839                 * missed events, then record it there.
4840                 */
4841                if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
4842                        memcpy(&bpage->data[commit], &missed_events,
4843                               sizeof(missed_events));
4844                        local_add(RB_MISSED_STORED, &bpage->commit);
4845                        commit += sizeof(missed_events);
4846                }
4847                local_add(RB_MISSED_EVENTS, &bpage->commit);
4848        }
4849
4850        /*
4851         * This page may be off to user land. Zero it out here.
4852         */
4853        if (commit < BUF_PAGE_SIZE)
4854                memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
4855
4856 out_unlock:
4857        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4858
4859 out:
4860        return ret;
4861}
4862EXPORT_SYMBOL_GPL(ring_buffer_read_page);
4863
4864/*
4865 * We only allocate new buffers, never free them if the CPU goes down.
4866 * If we were to free the buffer, then the user would lose any trace that was in
4867 * the buffer.
4868 */
4869int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
4870{
4871        struct ring_buffer *buffer;
4872        long nr_pages_same;
4873        int cpu_i;
4874        unsigned long nr_pages;
4875
4876        buffer = container_of(node, struct ring_buffer, node);
4877        if (cpumask_test_cpu(cpu, buffer->cpumask))
4878                return 0;
4879
4880        nr_pages = 0;
4881        nr_pages_same = 1;
4882        /* check if all cpu sizes are same */
4883        for_each_buffer_cpu(buffer, cpu_i) {
4884                /* fill in the size from first enabled cpu */
4885                if (nr_pages == 0)
4886                        nr_pages = buffer->buffers[cpu_i]->nr_pages;
4887                if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
4888                        nr_pages_same = 0;
4889                        break;
4890                }
4891        }
4892        /* allocate minimum pages, user can later expand it */
4893        if (!nr_pages_same)
4894                nr_pages = 2;
4895        buffer->buffers[cpu] =
4896                rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
4897        if (!buffer->buffers[cpu]) {
4898                WARN(1, "failed to allocate ring buffer on CPU %u\n",
4899                     cpu);
4900                return -ENOMEM;
4901        }
4902        smp_wmb();
4903        cpumask_set_cpu(cpu, buffer->cpumask);
4904        return 0;
4905}
4906
4907#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
4908/*
4909 * This is a basic integrity check of the ring buffer.
4910 * Late in the boot cycle this test will run when configured in.
4911 * It will kick off a thread per CPU that will go into a loop
4912 * writing to the per cpu ring buffer various sizes of data.
4913 * Some of the data will be large items, some small.
4914 *
4915 * Another thread is created that goes into a spin, sending out
4916 * IPIs to the other CPUs to also write into the ring buffer.
4917 * this is to test the nesting ability of the buffer.
4918 *
4919 * Basic stats are recorded and reported. If something in the
4920 * ring buffer should happen that's not expected, a big warning
4921 * is displayed and all ring buffers are disabled.
4922 */
4923static struct task_struct *rb_threads[NR_CPUS] __initdata;
4924
4925struct rb_test_data {
4926        struct ring_buffer      *buffer;
4927        unsigned long           events;
4928        unsigned long           bytes_written;
4929        unsigned long           bytes_alloc;
4930        unsigned long           bytes_dropped;
4931        unsigned long           events_nested;
4932        unsigned long           bytes_written_nested;
4933        unsigned long           bytes_alloc_nested;
4934        unsigned long           bytes_dropped_nested;
4935        int                     min_size_nested;
4936        int                     max_size_nested;
4937        int                     max_size;
4938        int                     min_size;
4939        int                     cpu;
4940        int                     cnt;
4941};
4942
4943static struct rb_test_data rb_data[NR_CPUS] __initdata;
4944
4945/* 1 meg per cpu */
4946#define RB_TEST_BUFFER_SIZE     1048576
4947
4948static char rb_string[] __initdata =
4949        "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
4950        "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
4951        "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
4952
4953static bool rb_test_started __initdata;
4954
4955struct rb_item {
4956        int size;
4957        char str[];
4958};
4959
4960static __init int rb_write_something(struct rb_test_data *data, bool nested)
4961{
4962        struct ring_buffer_event *event;
4963        struct rb_item *item;
4964        bool started;
4965        int event_len;
4966        int size;
4967        int len;
4968        int cnt;
4969
4970        /* Have nested writes different that what is written */
4971        cnt = data->cnt + (nested ? 27 : 0);
4972
4973        /* Multiply cnt by ~e, to make some unique increment */
4974        size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
4975
4976        len = size + sizeof(struct rb_item);
4977
4978        started = rb_test_started;
4979        /* read rb_test_started before checking buffer enabled */
4980        smp_rmb();
4981
4982        event = ring_buffer_lock_reserve(data->buffer, len);
4983        if (!event) {
4984                /* Ignore dropped events before test starts. */
4985                if (started) {
4986                        if (nested)
4987                                data->bytes_dropped += len;
4988                        else
4989                                data->bytes_dropped_nested += len;
4990                }
4991                return len;
4992        }
4993
4994        event_len = ring_buffer_event_length(event);
4995
4996        if (RB_WARN_ON(data->buffer, event_len < len))
4997                goto out;
4998
4999        item = ring_buffer_event_data(event);
5000        item->size = size;
5001        memcpy(item->str, rb_string, size);
5002
5003        if (nested) {
5004                data->bytes_alloc_nested += event_len;
5005                data->bytes_written_nested += len;
5006                data->events_nested++;
5007                if (!data->min_size_nested || len < data->min_size_nested)
5008                        data->min_size_nested = len;
5009                if (len > data->max_size_nested)
5010                        data->max_size_nested = len;
5011        } else {
5012                data->bytes_alloc += event_len;
5013                data->bytes_written += len;
5014                data->events++;
5015                if (!data->min_size || len < data->min_size)
5016                        data->max_size = len;
5017                if (len > data->max_size)
5018                        data->max_size = len;
5019        }
5020
5021 out:
5022        ring_buffer_unlock_commit(data->buffer, event);
5023
5024        return 0;
5025}
5026
5027static __init int rb_test(void *arg)
5028{
5029        struct rb_test_data *data = arg;
5030
5031        while (!kthread_should_stop()) {
5032                rb_write_something(data, false);
5033                data->cnt++;
5034
5035                set_current_state(TASK_INTERRUPTIBLE);
5036                /* Now sleep between a min of 100-300us and a max of 1ms */
5037                usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5038        }
5039
5040        return 0;
5041}
5042
5043static __init void rb_ipi(void *ignore)
5044{
5045        struct rb_test_data *data;
5046        int cpu = smp_processor_id();
5047
5048        data = &rb_data[cpu];
5049        rb_write_something(data, true);
5050}
5051
5052static __init int rb_hammer_test(void *arg)
5053{
5054        while (!kthread_should_stop()) {
5055
5056                /* Send an IPI to all cpus to write data! */
5057                smp_call_function(rb_ipi, NULL, 1);
5058                /* No sleep, but for non preempt, let others run */
5059                schedule();
5060        }
5061
5062        return 0;
5063}
5064
5065static __init int test_ringbuffer(void)
5066{
5067        struct task_struct *rb_hammer;
5068        struct ring_buffer *buffer;
5069        int cpu;
5070        int ret = 0;
5071
5072        if (security_locked_down(LOCKDOWN_TRACEFS)) {
5073                pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
5074                return 0;
5075        }
5076
5077        pr_info("Running ring buffer tests...\n");
5078
5079        buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5080        if (WARN_ON(!buffer))
5081                return 0;
5082
5083        /* Disable buffer so that threads can't write to it yet */
5084        ring_buffer_record_off(buffer);
5085
5086        for_each_online_cpu(cpu) {
5087                rb_data[cpu].buffer = buffer;
5088                rb_data[cpu].cpu = cpu;
5089                rb_data[cpu].cnt = cpu;
5090                rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
5091                                                 "rbtester/%d", cpu);
5092                if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
5093                        pr_cont("FAILED\n");
5094                        ret = PTR_ERR(rb_threads[cpu]);
5095                        goto out_free;
5096                }
5097
5098                kthread_bind(rb_threads[cpu], cpu);
5099                wake_up_process(rb_threads[cpu]);
5100        }
5101
5102        /* Now create the rb hammer! */
5103        rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
5104        if (WARN_ON(IS_ERR(rb_hammer))) {
5105                pr_cont("FAILED\n");
5106                ret = PTR_ERR(rb_hammer);
5107                goto out_free;
5108        }
5109
5110        ring_buffer_record_on(buffer);
5111        /*
5112         * Show buffer is enabled before setting rb_test_started.
5113         * Yes there's a small race window where events could be
5114         * dropped and the thread wont catch it. But when a ring
5115         * buffer gets enabled, there will always be some kind of
5116         * delay before other CPUs see it. Thus, we don't care about
5117         * those dropped events. We care about events dropped after
5118         * the threads see that the buffer is active.
5119         */
5120        smp_wmb();
5121        rb_test_started = true;
5122
5123        set_current_state(TASK_INTERRUPTIBLE);
5124        /* Just run for 10 seconds */;
5125        schedule_timeout(10 * HZ);
5126
5127        kthread_stop(rb_hammer);
5128
5129 out_free:
5130        for_each_online_cpu(cpu) {
5131                if (!rb_threads[cpu])
5132                        break;
5133                kthread_stop(rb_threads[cpu]);
5134        }
5135        if (ret) {
5136                ring_buffer_free(buffer);
5137                return ret;
5138        }
5139
5140        /* Report! */
5141        pr_info("finished\n");
5142        for_each_online_cpu(cpu) {
5143                struct ring_buffer_event *event;
5144                struct rb_test_data *data = &rb_data[cpu];
5145                struct rb_item *item;
5146                unsigned long total_events;
5147                unsigned long total_dropped;
5148                unsigned long total_written;
5149                unsigned long total_alloc;
5150                unsigned long total_read = 0;
5151                unsigned long total_size = 0;
5152                unsigned long total_len = 0;
5153                unsigned long total_lost = 0;
5154                unsigned long lost;
5155                int big_event_size;
5156                int small_event_size;
5157
5158                ret = -1;
5159
5160                total_events = data->events + data->events_nested;
5161                total_written = data->bytes_written + data->bytes_written_nested;
5162                total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
5163                total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
5164
5165                big_event_size = data->max_size + data->max_size_nested;
5166                small_event_size = data->min_size + data->min_size_nested;
5167
5168                pr_info("CPU %d:\n", cpu);
5169                pr_info("              events:    %ld\n", total_events);
5170                pr_info("       dropped bytes:    %ld\n", total_dropped);
5171                pr_info("       alloced bytes:    %ld\n", total_alloc);
5172                pr_info("       written bytes:    %ld\n", total_written);
5173                pr_info("       biggest event:    %d\n", big_event_size);
5174                pr_info("      smallest event:    %d\n", small_event_size);
5175
5176                if (RB_WARN_ON(buffer, total_dropped))
5177                        break;
5178
5179                ret = 0;
5180
5181                while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
5182                        total_lost += lost;
5183                        item = ring_buffer_event_data(event);
5184                        total_len += ring_buffer_event_length(event);
5185                        total_size += item->size + sizeof(struct rb_item);
5186                        if (memcmp(&item->str[0], rb_string, item->size) != 0) {
5187                                pr_info("FAILED!\n");
5188                                pr_info("buffer had: %.*s\n", item->size, item->str);
5189                                pr_info("expected:   %.*s\n", item->size, rb_string);
5190                                RB_WARN_ON(buffer, 1);
5191                                ret = -1;
5192                                break;
5193                        }
5194                        total_read++;
5195                }
5196                if (ret)
5197                        break;
5198
5199                ret = -1;
5200
5201                pr_info("         read events:   %ld\n", total_read);
5202                pr_info("         lost events:   %ld\n", total_lost);
5203                pr_info("        total events:   %ld\n", total_lost + total_read);
5204                pr_info("  recorded len bytes:   %ld\n", total_len);
5205                pr_info(" recorded size bytes:   %ld\n", total_size);
5206                if (total_lost)
5207                        pr_info(" With dropped events, record len and size may not match\n"
5208                                " alloced and written from above\n");
5209                if (!total_lost) {
5210                        if (RB_WARN_ON(buffer, total_len != total_alloc ||
5211                                       total_size != total_written))
5212                                break;
5213                }
5214                if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5215                        break;
5216
5217                ret = 0;
5218        }
5219        if (!ret)
5220                pr_info("Ring buffer PASSED!\n");
5221
5222        ring_buffer_free(buffer);
5223        return 0;
5224}
5225
5226late_initcall(test_ringbuffer);
5227#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
5228