diff --git a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java index e34908f01c..53de1b2df2 100644 --- a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java +++ b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java @@ -59,6 +59,11 @@ public class ServerHttpObservationFilter implements WebFilter { private static final Set DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException", "ClientAbortException", "EOFException", "EofException"); + /** + * Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core. + */ + private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation"; + private final ObservationRegistry observationRegistry; private final ServerRequestObservationConvention observationConvention; @@ -117,7 +122,8 @@ public class ServerHttpObservationFilter implements WebFilter { .doOnCancel(() -> { observationContext.setConnectionAborted(true); observation.stop(); - }); + }) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation)); } private void onTerminalSignal(Observation observation, ServerWebExchange exchange) { diff --git a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java index 79b2f3bef5..0f84861dc2 100644 --- a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java +++ b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java @@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive; import java.util.Optional; +import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; import org.assertj.core.api.ThrowingConsumer; @@ -59,6 +60,18 @@ class ServerHttpObservationFilterTests { assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS"); } + @Test + void filterShouldAddNewObservationToReactorContext() { + ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); + exchange.getResponse().setRawStatusCode(200); + WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> { + assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent(); + return Mono.empty(); + }); + this.filter.filter(exchange, filterChain).block(); + assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS"); + } + @Test void filterShouldUseThrownException() { ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java index 36423eb381..dad692438f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java @@ -27,21 +27,24 @@ import org.springframework.lang.Nullable; * @author Brian Clozel * @since 6.0 */ -public class ClientRequestObservationContext extends RequestReplySenderContext { +public class ClientRequestObservationContext extends RequestReplySenderContext { @Nullable private String uriTemplate; private boolean aborted; + @Nullable + private ClientRequest builtRequest; + public ClientRequestObservationContext() { super(ClientRequestObservationContext::setRequestHeader); } - private static void setRequestHeader(@Nullable ClientRequest request, String name, String value) { + private static void setRequestHeader(@Nullable ClientRequest.Builder request, String name, String value) { if (request != null) { - request.headers().set(name, value); + request.header(name, value); } } @@ -75,4 +78,18 @@ public class ClientRequestObservationContext extends RequestReplySenderContext exchange() { ClientRequestObservationContext observationContext = new ClientRequestObservationContext(); - ClientRequest request = (this.inserter != null ? - initRequestBuilder().body(this.inserter).build() : - initRequestBuilder().build()); - return Mono.defer(() -> { + ClientRequest.Builder requestBuilder = this.inserter != null ? + initRequestBuilder().body(this.inserter) : + initRequestBuilder(); + return Mono.deferContextual(contextView -> { Observation observation = ClientHttpObservationDocumentation.HTTP_REQUEST.observation(observationConvention, - DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry).start(); - observationContext.setCarrier(request); + DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry); + observationContext.setCarrier(requestBuilder); + observation + .parentObservation(contextView.getOrDefault(MICROMETER_OBSERVATION, null)) + .start(); + ClientRequest request = requestBuilder.build(); observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); + observationContext.setBuiltRequest(request); Mono responseMono = exchangeFunction.exchange(request) .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java index 2e3d3003f0..c2bb58789b 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java @@ -44,7 +44,8 @@ class DefaultClientRequestObservationConventionTests { @Test void shouldHaveContextualName() { ClientRequestObservationContext context = new ClientRequestObservationContext(); - context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")).build()); + context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test"))); + context.setBuiltRequest(context.getCarrier().build()); assertThat(this.observationConvention.getContextualName(context)).isEqualTo("http get"); } @@ -77,10 +78,11 @@ class DefaultClientRequestObservationConventionTests { @Test void shouldAddKeyValuesForRequestWithUriTemplate() { - ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")) - .attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}").build(); + ClientRequest.Builder request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")) + .attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}"); ClientRequestObservationContext context = createContext(request); context.setUriTemplate("/resource/{id}"); + context.setBuiltRequest(context.getCarrier().build()); assertThat(this.observationConvention.getLowCardinalityKeyValues(context)) .contains(KeyValue.of("exception", "none"), KeyValue.of("method", "GET"), KeyValue.of("uri", "/resource/{id}"), KeyValue.of("status", "200"), KeyValue.of("outcome", "SUCCESS")); @@ -90,7 +92,8 @@ class DefaultClientRequestObservationConventionTests { @Test void shouldAddKeyValuesForRequestWithoutUriTemplate() { - ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")).build()); + ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))); + context.setBuiltRequest(context.getCarrier().build()); assertThat(this.observationConvention.getLowCardinalityKeyValues(context)) .contains(KeyValue.of("method", "GET"), KeyValue.of("uri", "none")); assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).hasSize(2).contains(KeyValue.of("http.url", "/resource/42")); @@ -98,11 +101,12 @@ class DefaultClientRequestObservationConventionTests { @Test void shouldAddClientNameKeyValueForRequestWithHost() { - ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")).build()); + ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42"))); + context.setBuiltRequest(context.getCarrier().build()); assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).contains(KeyValue.of("client.name", "localhost")); } - private ClientRequestObservationContext createContext(ClientRequest request) { + private ClientRequestObservationContext createContext(ClientRequest.Builder request) { ClientRequestObservationContext context = new ClientRequestObservationContext(); context.setCarrier(request); context.setResponse(ClientResponse.create(HttpStatus.OK).build()); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java index 79800678a9..d357a1bab1 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java @@ -17,7 +17,11 @@ package org.springframework.web.reactive.function.client; import java.time.Duration; +import java.util.Collections; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +32,7 @@ import reactor.test.StepVerifier; import org.springframework.http.HttpStatus; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.when; @@ -57,17 +62,35 @@ class WebClientObservationTests { when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty()); given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse)); this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry); + this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler()); } - @Test void recordsObservationForSuccessfulExchange() { this.builder.build().get().uri("/resource/{id}", 42) .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10)); - verifyAndGetRequest(); + + ClientRequest clientRequest = verifyAndGetRequest(); assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS") .hasLowCardinalityKeyValue("uri", "/resource/{id}"); + assertThat(clientRequest.headers()).containsEntry("foo", Collections.singletonList("bar")); + } + + @Test + void recordsObservationForSuccessfulExchangeWithParentObservationInReactorContext() { + Observation parent = Observation.start("parent", observationRegistry); + try { + this.builder.build().get().uri("/resource/{id}", 42) + .retrieve().bodyToMono(Void.class).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parent)).block(Duration.ofSeconds(10)); + verifyAndGetRequest(); + + assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS") + .hasParentObservationEqualTo(parent); + } + finally { + parent.stop(); + } } @Test @@ -102,4 +125,17 @@ class WebClientObservationTests { return request.getValue(); } + static class HeaderInjectingHandler implements ObservationHandler { + + @Override + public void onStart(ClientRequestObservationContext context) { + context.getSetter().set(context.getCarrier(), "foo", "bar"); + } + + @Override + public boolean supportsContext(Observation.Context context) { + return context instanceof ClientRequestObservationContext; + } + } + }