Commit 73131e99 authored by Phillip Webb's avatar Phillip Webb

Make BufferingApplicationStartup thread safe

Update `BufferingApplicationStartup` to use thread safe data structures.

Prior to this commit, it was possible for calls from different threads
(for example due to request scope beans) to cause a
NoSuchElementException to be thrown.

Closes gh-25792
parent fd3d6196
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -46,11 +46,10 @@ class StartupEndpointDocumentationTests extends MockMvcEndpointDocumentationTest
void appendSampleStartupSteps(@Autowired BufferingApplicationStartup applicationStartup) {
StartupStep starting = applicationStartup.start("spring.boot.application.starting");
starting.tag("mainApplicationClass", "com.example.startup.StartupApplication");
starting.end();
StartupStep instantiate = applicationStartup.start("spring.beans.instantiate");
instantiate.tag("beanName", "homeController");
instantiate.end();
starting.end();
}
@Test
......@@ -67,14 +66,13 @@ class StartupEndpointDocumentationTests extends MockMvcEndpointDocumentationTest
fieldWithPath("timeline.events.[].startupStep.name").description("The name of the StartupStep."),
fieldWithPath("timeline.events.[].startupStep.id").description("The id of this StartupStep."),
fieldWithPath("timeline.events.[].startupStep.parentId")
.description("The parent id for this StartupStep."),
.description("The parent id for this StartupStep.").optional(),
fieldWithPath("timeline.events.[].startupStep.tags")
.description("An array of key/value pairs with additional step info."),
fieldWithPath("timeline.events.[].startupStep.tags[].key")
.description("The key of the StartupStep Tag."),
fieldWithPath("timeline.events.[].startupStep.tags[].value")
.description("The value of the StartupStep Tag."));
this.mockMvc.perform(post("/actuator/startup")).andExpect(status().isOk())
.andDo(document("startup", responseFields));
}
......
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,11 +16,16 @@
package org.springframework.boot.context.metrics.buffering;
import java.util.Iterator;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.core.metrics.StartupStep;
import org.springframework.util.Assert;
/**
* {@link StartupStep} implementation to be buffered by
......@@ -28,6 +33,7 @@ import org.springframework.core.metrics.StartupStep;
* {@link System#nanoTime()}.
*
* @author Brian Clozel
* @author Phillip Webb
*/
class BufferedStartupStep implements StartupStep {
......@@ -35,24 +41,29 @@ class BufferedStartupStep implements StartupStep {
private final long id;
private final Long parentId;
private final BufferedStartupStep parent;
private long startTime;
private final List<Tag> tags = new ArrayList<>();
private long endTime;
private final Consumer<BufferedStartupStep> recorder;
private final DefaultTags tags;
private final Instant startTime;
private final Consumer<BufferedStartupStep> recorder;
private final AtomicBoolean ended = new AtomicBoolean();
BufferedStartupStep(long id, String name, Long parentId, Consumer<BufferedStartupStep> recorder) {
this.id = id;
this.parentId = parentId;
this.tags = new DefaultTags();
BufferedStartupStep(BufferedStartupStep parent, String name, long id, Instant startTime,
Consumer<BufferedStartupStep> recorder) {
this.parent = parent;
this.name = name;
this.id = id;
this.startTime = startTime;
this.recorder = recorder;
}
BufferedStartupStep getParent() {
return this.parent;
}
@Override
public String getName() {
return this.name;
......@@ -63,88 +74,40 @@ class BufferedStartupStep implements StartupStep {
return this.id;
}
Instant getStartTime() {
return this.startTime;
}
@Override
public Long getParentId() {
return this.parentId;
return (this.parent != null) ? this.parent.getId() : null;
}
@Override
public Tags getTags() {
return this.tags;
return Collections.unmodifiableList(this.tags)::iterator;
}
@Override
public StartupStep tag(String key, String value) {
if (this.endTime != 0L) {
throw new IllegalStateException("StartupStep has already ended.");
}
this.tags.add(key, value);
return this;
public StartupStep tag(String key, Supplier<String> value) {
return this.tag(key, value.get());
}
@Override
public StartupStep tag(String key, Supplier<String> value) {
return this.tag(key, value.get());
public StartupStep tag(String key, String value) {
Assert.state(!this.ended.get(), "StartupStep has already ended.");
this.tags.add(new DefaultTag(key, value));
return this;
}
@Override
public void end() {
this.ended.set(true);
this.recorder.accept(this);
}
long getStartTime() {
return this.startTime;
}
void recordStartTime(long startTime) {
this.startTime = startTime;
}
long getEndTime() {
return this.endTime;
}
void recordEndTime(long endTime) {
this.endTime = endTime;
}
static class DefaultTags implements Tags {
private Tag[] tags = new Tag[0];
void add(String key, String value) {
Tag[] newTags = new Tag[this.tags.length + 1];
System.arraycopy(this.tags, 0, newTags, 0, this.tags.length);
newTags[newTags.length - 1] = new DefaultTag(key, value);
this.tags = newTags;
}
@Override
public Iterator<Tag> iterator() {
return new TagsIterator();
}
private class TagsIterator implements Iterator<Tag> {
private int index = 0;
@Override
public boolean hasNext() {
return this.index < DefaultTags.this.tags.length;
}
@Override
public Tag next() {
return DefaultTags.this.tags[this.index++];
}
@Override
public void remove() {
throw new UnsupportedOperationException("tags are append only");
}
}
boolean isEnded() {
return this.ended.get();
}
static class DefaultTag implements Tag {
......
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,15 +16,17 @@
package org.springframework.boot.context.metrics.buffering;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.springframework.boot.context.metrics.buffering.StartupTimeline.TimelineEvent;
import org.springframework.core.metrics.ApplicationStartup;
import org.springframework.core.metrics.StartupStep;
import org.springframework.util.Assert;
......@@ -45,21 +47,26 @@ import org.springframework.util.Assert;
* </ul>
*
* @author Brian Clozel
* @author Phillip Webb
* @since 2.4.0
*/
public class BufferingApplicationStartup implements ApplicationStartup {
private Instant recordingStartTime;
private final int capacity;
private long recordingStartNanos;
private final Clock clock;
private long currentSequenceId = 0;
private Instant startTime;
private final Deque<Long> currentSteps;
private final AtomicInteger idSeq = new AtomicInteger();
private final BlockingQueue<BufferedStartupStep> recordedSteps;
private Predicate<StartupStep> filter = (step) -> true;
private Predicate<StartupStep> stepFilters = (step) -> true;
private final AtomicReference<BufferedStartupStep> current = new AtomicReference<>();
private final AtomicInteger estimatedSize = new AtomicInteger();
private final ConcurrentLinkedQueue<TimelineEvent> events = new ConcurrentLinkedQueue<>();
/**
* Create a new buffered {@link ApplicationStartup} with a limited capacity and starts
......@@ -67,10 +74,13 @@ public class BufferingApplicationStartup implements ApplicationStartup {
* @param capacity the configured capacity; once reached, new steps are not recorded.
*/
public BufferingApplicationStartup(int capacity) {
this.currentSteps = new ArrayDeque<>();
this.currentSteps.offerFirst(this.currentSequenceId);
this.recordedSteps = new LinkedBlockingQueue<>(capacity);
startRecording();
this(capacity, Clock.systemDefaultZone());
}
BufferingApplicationStartup(int capacity, Clock clock) {
this.capacity = capacity;
this.clock = clock;
this.startTime = clock.instant();
}
/**
......@@ -81,9 +91,8 @@ public class BufferingApplicationStartup implements ApplicationStartup {
* already.
*/
public void startRecording() {
Assert.state(this.recordedSteps.isEmpty(), "Cannot restart recording once steps have been buffered.");
this.recordingStartTime = Instant.now();
this.recordingStartNanos = getCurrentTime();
Assert.state(this.events.isEmpty(), "Cannot restart recording once steps have been buffered.");
this.startTime = this.clock.instant();
}
/**
......@@ -93,7 +102,42 @@ public class BufferingApplicationStartup implements ApplicationStartup {
* @param filter the predicate filter to add.
*/
public void addFilter(Predicate<StartupStep> filter) {
this.stepFilters = this.stepFilters.and(filter);
this.filter = this.filter.and(filter);
}
@Override
public StartupStep start(String name) {
int id = this.idSeq.getAndIncrement();
Instant start = this.clock.instant();
while (true) {
BufferedStartupStep current = this.current.get();
BufferedStartupStep parent = getLatestActive(current);
BufferedStartupStep next = new BufferedStartupStep(parent, name, id, start, this::record);
if (this.current.compareAndSet(current, next)) {
return next;
}
}
}
private void record(BufferedStartupStep step) {
if (this.filter.test(step) && this.estimatedSize.get() < this.capacity) {
this.estimatedSize.incrementAndGet();
this.events.add(new TimelineEvent(step, this.clock.instant()));
}
while (true) {
BufferedStartupStep current = this.current.get();
BufferedStartupStep next = getLatestActive(current);
if (this.current.compareAndSet(current, next)) {
return;
}
}
}
private BufferedStartupStep getLatestActive(BufferedStartupStep step) {
while (step != null && step.isEnded()) {
step = step.getParent();
}
return step;
}
/**
......@@ -105,7 +149,7 @@ public class BufferingApplicationStartup implements ApplicationStartup {
* @return a snapshot of currently buffered steps.
*/
public StartupTimeline getBufferedTimeline() {
return new StartupTimeline(this.recordingStartTime, this.recordingStartNanos, this.recordedSteps);
return new StartupTimeline(this.startTime, new ArrayList<>(this.events));
}
/**
......@@ -116,30 +160,14 @@ public class BufferingApplicationStartup implements ApplicationStartup {
* @return buffered steps drained from the buffer.
*/
public StartupTimeline drainBufferedTimeline() {
List<BufferedStartupStep> steps = new ArrayList<>(this.recordedSteps.size());
this.recordedSteps.drainTo(steps);
return new StartupTimeline(this.recordingStartTime, this.recordingStartNanos, steps);
}
@Override
public StartupStep start(String name) {
BufferedStartupStep step = new BufferedStartupStep(++this.currentSequenceId, name,
this.currentSteps.peekFirst(), this::record);
step.recordStartTime(getCurrentTime());
this.currentSteps.offerFirst(this.currentSequenceId);
return step;
}
private void record(BufferedStartupStep step) {
step.recordEndTime(getCurrentTime());
if (this.stepFilters.test(step)) {
this.recordedSteps.offer(step);
List<TimelineEvent> events = new ArrayList<>();
Iterator<TimelineEvent> iterator = this.events.iterator();
while (iterator.hasNext()) {
events.add(iterator.next());
iterator.remove();
}
this.currentSteps.removeFirst();
}
private long getCurrentTime() {
return System.nanoTime();
this.estimatedSize.set(0);
return new StartupTimeline(this.startTime, events);
}
}
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,9 +18,8 @@ package org.springframework.boot.context.metrics.buffering;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.core.metrics.StartupStep;
......@@ -38,10 +37,9 @@ public class StartupTimeline {
private final List<TimelineEvent> events;
StartupTimeline(Instant startTime, long startNanoTime, Collection<BufferedStartupStep> events) {
StartupTimeline(Instant startTime, List<TimelineEvent> events) {
this.startTime = startTime;
this.events = events.stream().map((event) -> new TimelineEvent(event, startTime, startNanoTime))
.collect(Collectors.toList());
this.events = Collections.unmodifiableList(events);
}
/**
......@@ -67,19 +65,16 @@ public class StartupTimeline {
*/
public static class TimelineEvent {
private final StartupStep startupStep;
private final Instant startTime;
private final BufferedStartupStep step;
private final Instant endTime;
private final Duration duration;
TimelineEvent(BufferedStartupStep startupStep, Instant startupDate, long startupNanoTime) {
this.startupStep = startupStep;
this.startTime = startupDate.plus(Duration.ofNanos(startupStep.getStartTime() - startupNanoTime));
this.endTime = startupDate.plus(Duration.ofNanos(startupStep.getEndTime() - startupNanoTime));
this.duration = Duration.ofNanos(startupStep.getEndTime() - startupStep.getStartTime());
TimelineEvent(BufferedStartupStep step, Instant endTime) {
this.step = step;
this.endTime = endTime;
this.duration = Duration.between(step.getStartTime(), endTime);
}
/**
......@@ -87,7 +82,7 @@ public class StartupTimeline {
* @return the start time
*/
public Instant getStartTime() {
return this.startTime;
return this.step.getStartTime();
}
/**
......@@ -112,7 +107,7 @@ public class StartupTimeline {
* @return the step information.
*/
public StartupStep getStartupStep() {
return this.startupStep;
return this.step;
}
}
......
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,8 +16,14 @@
package org.springframework.boot.context.metrics.buffering;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.metrics.buffering.StartupTimeline.TimelineEvent;
import org.springframework.core.metrics.StartupStep;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -27,6 +33,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
* Tests for {@link BufferingApplicationStartup}.
*
* @author Brian Clozel
* @author Phillip Webb
*/
class BufferingApplicationStartupTests {
......@@ -47,13 +54,14 @@ class BufferingApplicationStartupTests {
StartupStep filtered = applicationStartup.start("filtered.second");
applicationStartup.start("spring.third").end();
filtered.end();
assertThat(applicationStartup.getBufferedTimeline().getEvents()).hasSize(2);
StartupTimeline.TimelineEvent firstEvent = applicationStartup.getBufferedTimeline().getEvents().get(0);
assertThat(firstEvent.getStartupStep().getId()).isEqualTo(1);
assertThat(firstEvent.getStartupStep().getParentId()).isEqualTo(0);
StartupTimeline.TimelineEvent secondEvent = applicationStartup.getBufferedTimeline().getEvents().get(1);
assertThat(secondEvent.getStartupStep().getId()).isEqualTo(3);
assertThat(secondEvent.getStartupStep().getParentId()).isEqualTo(2);
List<TimelineEvent> events = applicationStartup.getBufferedTimeline().getEvents();
assertThat(events).hasSize(2);
StartupTimeline.TimelineEvent firstEvent = events.get(0);
assertThat(firstEvent.getStartupStep().getId()).isEqualTo(0);
assertThat(firstEvent.getStartupStep().getParentId()).isNull();
StartupTimeline.TimelineEvent secondEvent = events.get(1);
assertThat(secondEvent.getStartupStep().getId()).isEqualTo(2);
assertThat(secondEvent.getStartupStep().getParentId()).isEqualTo(1);
}
@Test
......@@ -96,8 +104,53 @@ class BufferingApplicationStartupTests {
BufferingApplicationStartup applicationStartup = new BufferingApplicationStartup(2);
StartupStep step = applicationStartup.start("first");
step.tag("name", "value");
assertThatThrownBy(() -> step.getTags().iterator().remove()).isInstanceOf(UnsupportedOperationException.class)
.hasMessage("tags are append only");
assertThatThrownBy(() -> step.getTags().iterator().remove()).isInstanceOf(UnsupportedOperationException.class);
}
@Test // gh-25792
void outOfOrderWithMultipleEndCallsShouldNotFail() {
BufferingApplicationStartup applicationStartup = new BufferingApplicationStartup(200);
StartupStep one = applicationStartup.start("one");
StartupStep two = applicationStartup.start("two");
StartupStep three = applicationStartup.start("three");
two.end();
two.end();
two.end();
StartupStep four = applicationStartup.start("four");
four.end();
three.end();
one.end();
}
@Test // gh-25792
void multiThreadedAccessShouldWork() throws InterruptedException {
BufferingApplicationStartup applicationStartup = new BufferingApplicationStartup(5000);
Queue<Exception> errors = new ConcurrentLinkedQueue<>();
List<Thread> threads = new ArrayList<>();
for (int thread = 0; thread < 20; thread++) {
String prefix = "thread-" + thread + "-";
threads.add(new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
StartupStep step = applicationStartup.start(prefix + i);
try {
Thread.sleep(1);
}
catch (InterruptedException ex) {
}
step.end();
}
}
catch (Exception ex) {
errors.add(ex);
}
}));
}
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
assertThat(errors).isEmpty();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment