ring_buffer: implement new locking
The old "lock always" scheme had issues with lockdep, and was not very
efficient anyways.
This patch does a new design to be partially lockless on writes.
Writes will add new entries to the per cpu pages by simply disabling
interrupts. When a write needs to go to another page than it will
grab the lock.
A new "read page" has been added so that the reader can pull out a page
from the ring buffer to read without worrying about the writer writing over
it. This allows us to not take the lock for all reads. The lock is
now only taken when a read needs to go to a new page.
This is far from lockless, and interrupts still need to be disabled,
but it is a step towards a more lockless solution, and it also
solves a lot of the issues that were noticed by the first conversion
of ftrace to the ring buffers.
Note: the ring_buffer_{un}lock API has been removed.
Signed-off-by: Steven Rostedt <srostedt@redhat.com>
Signed-off-by: Ingo Molnar <mingo@elte.hu>
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 8e7392f..9631abf 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -161,8 +161,10 @@
struct list_head pages;
unsigned long head; /* read from head */
unsigned long tail; /* write to tail */
+ unsigned long reader;
struct buffer_page *head_page;
struct buffer_page *tail_page;
+ struct buffer_page *reader_page;
unsigned long overrun;
unsigned long entries;
u64 write_stamp;
@@ -260,6 +262,7 @@
rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
+ unsigned long addr;
int ret;
cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
@@ -272,9 +275,16 @@
spin_lock_init(&cpu_buffer->lock);
INIT_LIST_HEAD(&cpu_buffer->pages);
+ addr = __get_free_page(GFP_KERNEL);
+ if (!addr)
+ goto fail_free_buffer;
+ cpu_buffer->reader_page = (struct buffer_page *)virt_to_page(addr);
+ INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
+ cpu_buffer->reader_page->size = 0;
+
ret = rb_allocate_pages(cpu_buffer, buffer->pages);
if (ret < 0)
- goto fail_free_buffer;
+ goto fail_free_reader;
cpu_buffer->head_page
= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
@@ -283,6 +293,9 @@
return cpu_buffer;
+ fail_free_reader:
+ free_buffer_page(cpu_buffer->reader_page);
+
fail_free_buffer:
kfree(cpu_buffer);
return NULL;
@@ -293,6 +306,9 @@
struct list_head *head = &cpu_buffer->pages;
struct buffer_page *page, *tmp;
+ list_del_init(&cpu_buffer->reader_page->list);
+ free_buffer_page(cpu_buffer->reader_page);
+
list_for_each_entry_safe(page, tmp, head, list) {
list_del_init(&page->list);
free_buffer_page(page);
@@ -538,8 +554,10 @@
static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
{
- return cpu_buffer->head_page == cpu_buffer->tail_page &&
- cpu_buffer->head == cpu_buffer->tail;
+ return (cpu_buffer->reader == cpu_buffer->reader_page->size &&
+ (cpu_buffer->tail_page == cpu_buffer->reader_page ||
+ (cpu_buffer->tail_page == cpu_buffer->head_page &&
+ cpu_buffer->head == cpu_buffer->tail)));
}
static inline int rb_null_event(struct ring_buffer_event *event)
@@ -555,10 +573,10 @@
}
static inline struct ring_buffer_event *
-rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
+rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
{
- return rb_page_index(cpu_buffer->head_page,
- cpu_buffer->head);
+ return rb_page_index(cpu_buffer->reader_page,
+ cpu_buffer->reader);
}
static inline struct ring_buffer_event *
@@ -610,15 +628,32 @@
cpu_buffer->write_stamp = *ts;
}
-static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
+static void rb_reset_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
- cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
cpu_buffer->head = 0;
}
-static void
-rb_reset_iter_read_page(struct ring_buffer_iter *iter)
+static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
{
+ cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
+ cpu_buffer->reader = 0;
+}
+
+static inline void rb_inc_iter(struct ring_buffer_iter *iter)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
+
+ /*
+ * The iterator could be on the reader page (it starts there).
+ * But the head could have moved, since the reader was
+ * found. Check for this case and assign the iterator
+ * to the head page instead of next.
+ */
+ if (iter->head_page == cpu_buffer->reader_page)
+ iter->head_page = cpu_buffer->head_page;
+ else
+ rb_inc_page(cpu_buffer, &iter->head_page);
+
iter->read_stamp = iter->head_page->time_stamp;
iter->head = 0;
}
@@ -693,30 +728,39 @@
__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
unsigned type, unsigned long length, u64 *ts)
{
- struct buffer_page *head_page, *tail_page;
+ struct buffer_page *tail_page, *head_page, *reader_page;
unsigned long tail;
struct ring_buffer *buffer = cpu_buffer->buffer;
struct ring_buffer_event *event;
+ /* No locking needed for tail page */
tail_page = cpu_buffer->tail_page;
- head_page = cpu_buffer->head_page;
tail = cpu_buffer->tail;
if (tail + length > BUF_PAGE_SIZE) {
struct buffer_page *next_page = tail_page;
+ spin_lock(&cpu_buffer->lock);
rb_inc_page(cpu_buffer, &next_page);
+ head_page = cpu_buffer->head_page;
+ reader_page = cpu_buffer->reader_page;
+
+ /* we grabbed the lock before incrementing */
+ WARN_ON(next_page == reader_page);
+
if (next_page == head_page) {
- if (!(buffer->flags & RB_FL_OVERWRITE))
+ if (!(buffer->flags & RB_FL_OVERWRITE)) {
+ spin_unlock(&cpu_buffer->lock);
return NULL;
+ }
/* count overflows */
rb_update_overflow(cpu_buffer);
rb_inc_page(cpu_buffer, &head_page);
cpu_buffer->head_page = head_page;
- rb_reset_read_page(cpu_buffer);
+ rb_reset_head_page(cpu_buffer);
}
if (tail != BUF_PAGE_SIZE) {
@@ -732,6 +776,7 @@
cpu_buffer->tail_page = tail_page;
cpu_buffer->tail = tail;
rb_add_stamp(cpu_buffer, ts);
+ spin_unlock(&cpu_buffer->lock);
}
BUG_ON(tail + length > BUF_PAGE_SIZE);
@@ -802,7 +847,9 @@
return NULL;
}
} else {
+ spin_lock(&cpu_buffer->lock);
rb_add_stamp(cpu_buffer, &ts);
+ spin_unlock(&cpu_buffer->lock);
delta = 0;
}
@@ -851,13 +898,12 @@
cpu = raw_smp_processor_id();
if (!cpu_isset(cpu, buffer->cpumask))
- goto out_irq;
+ goto out;
cpu_buffer = buffer->buffers[cpu];
- spin_lock(&cpu_buffer->lock);
if (atomic_read(&cpu_buffer->record_disabled))
- goto no_record;
+ goto out;
length = rb_calculate_event_length(length);
if (length > BUF_PAGE_SIZE)
@@ -865,13 +911,11 @@
event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
if (!event)
- goto no_record;
+ goto out;
return event;
- no_record:
- spin_unlock(&cpu_buffer->lock);
- out_irq:
+ out:
local_irq_restore(*flags);
return NULL;
}
@@ -904,11 +948,8 @@
cpu_buffer = buffer->buffers[cpu];
- assert_spin_locked(&cpu_buffer->lock);
-
rb_commit(cpu_buffer, event);
- spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
return 0;
@@ -945,10 +986,9 @@
cpu = raw_smp_processor_id();
if (!cpu_isset(cpu, buffer->cpumask))
- goto out_irq;
+ goto out;
cpu_buffer = buffer->buffers[cpu];
- spin_lock(&cpu_buffer->lock);
if (atomic_read(&cpu_buffer->record_disabled))
goto out;
@@ -967,56 +1007,12 @@
ret = 0;
out:
- spin_unlock(&cpu_buffer->lock);
- out_irq:
local_irq_restore(flags);
return ret;
}
/**
- * ring_buffer_lock - lock the ring buffer
- * @buffer: The ring buffer to lock
- * @flags: The place to store the interrupt flags
- *
- * This locks all the per CPU buffers.
- *
- * Must be unlocked by ring_buffer_unlock.
- */
-void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
-{
- struct ring_buffer_per_cpu *cpu_buffer;
- int cpu;
-
- local_irq_save(*flags);
-
- for_each_buffer_cpu(buffer, cpu) {
- cpu_buffer = buffer->buffers[cpu];
- spin_lock(&cpu_buffer->lock);
- }
-}
-
-/**
- * ring_buffer_unlock - unlock a locked buffer
- * @buffer: The locked buffer to unlock
- * @flags: The interrupt flags received by ring_buffer_lock
- */
-void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
-{
- struct ring_buffer_per_cpu *cpu_buffer;
- int cpu;
-
- for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
- if (!cpu_isset(cpu, buffer->cpumask))
- continue;
- cpu_buffer = buffer->buffers[cpu];
- spin_unlock(&cpu_buffer->lock);
- }
-
- local_irq_restore(flags);
-}
-
-/**
* ring_buffer_record_disable - stop all writes into the buffer
* @buffer: The ring buffer to stop writes to.
*
@@ -1169,9 +1165,18 @@
{
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
- iter->head_page = cpu_buffer->head_page;
- iter->head = cpu_buffer->head;
- rb_reset_iter_read_page(iter);
+ /* Iterator usage is expected to have record disabled */
+ if (list_empty(&cpu_buffer->reader_page->list)) {
+ iter->head_page = cpu_buffer->head_page;
+ iter->head = cpu_buffer->head;
+ } else {
+ iter->head_page = cpu_buffer->reader_page;
+ iter->head = cpu_buffer->reader;
+ }
+ if (iter->head)
+ iter->read_stamp = cpu_buffer->read_stamp;
+ else
+ iter->read_stamp = iter->head_page->time_stamp;
}
/**
@@ -1250,43 +1255,84 @@
return;
}
-static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
+static struct buffer_page *
+rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
{
- struct ring_buffer_event *event;
- unsigned length;
+ struct buffer_page *reader = NULL;
+ unsigned long flags;
+
+ spin_lock_irqsave(&cpu_buffer->lock, flags);
+
+ again:
+ reader = cpu_buffer->reader_page;
+
+ /* If there's more to read, return this page */
+ if (cpu_buffer->reader < reader->size)
+ goto out;
+
+ /* Never should we have an index greater than the size */
+ WARN_ON(cpu_buffer->reader > reader->size);
+
+ /* check if we caught up to the tail */
+ reader = NULL;
+ if (cpu_buffer->tail_page == cpu_buffer->reader_page)
+ goto out;
/*
- * Check if we are at the end of the buffer.
+ * Splice the empty reader page into the list around the head.
+ * Reset the reader page to size zero.
*/
- if (cpu_buffer->head >= cpu_buffer->head_page->size) {
- BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
- rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
- rb_reset_read_page(cpu_buffer);
- return;
- }
- event = rb_head_event(cpu_buffer);
+ reader = cpu_buffer->head_page;
+ cpu_buffer->reader_page->list.next = reader->list.next;
+ cpu_buffer->reader_page->list.prev = reader->list.prev;
+ cpu_buffer->reader_page->size = 0;
+
+ /* Make the reader page now replace the head */
+ reader->list.prev->next = &cpu_buffer->reader_page->list;
+ reader->list.next->prev = &cpu_buffer->reader_page->list;
+
+ /*
+ * If the tail is on the reader, then we must set the head
+ * to the inserted page, otherwise we set it one before.
+ */
+ cpu_buffer->head_page = cpu_buffer->reader_page;
+
+ if (cpu_buffer->tail_page != reader)
+ rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
+
+ /* Finally update the reader page to the new head */
+ cpu_buffer->reader_page = reader;
+ rb_reset_reader_page(cpu_buffer);
+
+ goto again;
+
+ out:
+ spin_unlock_irqrestore(&cpu_buffer->lock, flags);
+
+ return reader;
+}
+
+static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct ring_buffer_event *event;
+ struct buffer_page *reader;
+ unsigned length;
+
+ reader = rb_get_reader_page(cpu_buffer);
+
+ /* This function should not be called when buffer is empty */
+ BUG_ON(!reader);
+
+ event = rb_reader_event(cpu_buffer);
if (event->type == RINGBUF_TYPE_DATA)
cpu_buffer->entries--;
- length = rb_event_length(event);
-
- /*
- * This should not be called to advance the header if we are
- * at the tail of the buffer.
- */
- BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
- (cpu_buffer->head + length > cpu_buffer->tail));
-
rb_update_read_stamp(cpu_buffer, event);
- cpu_buffer->head += length;
-
- /* check for end of page */
- if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
- (cpu_buffer->head_page != cpu_buffer->tail_page))
- rb_advance_head(cpu_buffer);
+ length = rb_event_length(event);
+ cpu_buffer->reader += length;
}
static void rb_advance_iter(struct ring_buffer_iter *iter)
@@ -1304,8 +1350,7 @@
*/
if (iter->head >= iter->head_page->size) {
BUG_ON(iter->head_page == cpu_buffer->tail_page);
- rb_inc_page(cpu_buffer, &iter->head_page);
- rb_reset_iter_read_page(iter);
+ rb_inc_iter(iter);
return;
}
@@ -1344,6 +1389,7 @@
{
struct ring_buffer_per_cpu *cpu_buffer;
struct ring_buffer_event *event;
+ struct buffer_page *reader;
if (!cpu_isset(cpu, buffer->cpumask))
return NULL;
@@ -1351,25 +1397,26 @@
cpu_buffer = buffer->buffers[cpu];
again:
- if (rb_per_cpu_empty(cpu_buffer))
+ reader = rb_get_reader_page(cpu_buffer);
+ if (!reader)
return NULL;
- event = rb_head_event(cpu_buffer);
+ event = rb_reader_event(cpu_buffer);
switch (event->type) {
case RINGBUF_TYPE_PADDING:
- rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
- rb_reset_read_page(cpu_buffer);
- goto again;
+ WARN_ON(1);
+ rb_advance_reader(cpu_buffer);
+ return NULL;
case RINGBUF_TYPE_TIME_EXTEND:
/* Internal data, OK to advance */
- rb_advance_head(cpu_buffer);
+ rb_advance_reader(cpu_buffer);
goto again;
case RINGBUF_TYPE_TIME_STAMP:
/* FIXME: not implemented */
- rb_advance_head(cpu_buffer);
+ rb_advance_reader(cpu_buffer);
goto again;
case RINGBUF_TYPE_DATA:
@@ -1415,8 +1462,7 @@
switch (event->type) {
case RINGBUF_TYPE_PADDING:
- rb_inc_page(cpu_buffer, &iter->head_page);
- rb_reset_iter_read_page(iter);
+ rb_inc_iter(iter);
goto again;
case RINGBUF_TYPE_TIME_EXTEND:
@@ -1465,7 +1511,7 @@
return NULL;
cpu_buffer = buffer->buffers[cpu];
- rb_advance_head(cpu_buffer);
+ rb_advance_reader(cpu_buffer);
return event;
}
@@ -1487,6 +1533,7 @@
{
struct ring_buffer_per_cpu *cpu_buffer;
struct ring_buffer_iter *iter;
+ unsigned long flags;
if (!cpu_isset(cpu, buffer->cpumask))
return NULL;
@@ -1502,11 +1549,9 @@
atomic_inc(&cpu_buffer->record_disabled);
synchronize_sched();
- spin_lock(&cpu_buffer->lock);
- iter->head = cpu_buffer->head;
- iter->head_page = cpu_buffer->head_page;
- rb_reset_iter_read_page(iter);
- spin_unlock(&cpu_buffer->lock);
+ spin_lock_irqsave(&cpu_buffer->lock, flags);
+ ring_buffer_iter_reset(iter);
+ spin_unlock_irqrestore(&cpu_buffer->lock, flags);
return iter;
}
@@ -1562,10 +1607,14 @@
{
cpu_buffer->head_page
= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
- cpu_buffer->tail_page
- = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
+ cpu_buffer->head_page->size = 0;
+ cpu_buffer->tail_page = cpu_buffer->head_page;
+ cpu_buffer->tail_page->size = 0;
+ INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
+ cpu_buffer->reader_page->size = 0;
- cpu_buffer->head = cpu_buffer->tail = 0;
+ cpu_buffer->head = cpu_buffer->tail = cpu_buffer->reader = 0;
+
cpu_buffer->overrun = 0;
cpu_buffer->entries = 0;
}
@@ -1583,13 +1632,11 @@
if (!cpu_isset(cpu, buffer->cpumask))
return;
- local_irq_save(flags);
- spin_lock(&cpu_buffer->lock);
+ spin_lock_irqsave(&cpu_buffer->lock, flags);
rb_reset_cpu(cpu_buffer);
- spin_unlock(&cpu_buffer->lock);
- local_irq_restore(flags);
+ spin_unlock_irqrestore(&cpu_buffer->lock, flags);
}
/**
@@ -1598,15 +1645,10 @@
*/
void ring_buffer_reset(struct ring_buffer *buffer)
{
- unsigned long flags;
int cpu;
- ring_buffer_lock(buffer, &flags);
-
for_each_buffer_cpu(buffer, cpu)
- rb_reset_cpu(buffer->buffers[cpu]);
-
- ring_buffer_unlock(buffer, flags);
+ ring_buffer_reset_cpu(buffer, cpu);
}
/**