Add observability adapter for Lettuce.

We now provide MicrometerTracingAdapter to connect Lettuce to Micrometer Tracing.

Original pull request: #2439
Closes #2348
This commit is contained in:
Mark Paluch
2022-10-20 10:33:58 +02:00
parent 190b28cbbe
commit ccdd5d2d98
13 changed files with 1081 additions and 1 deletions

View File

@@ -0,0 +1,81 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import java.net.InetSocketAddress;
import java.util.Locale;
import org.springframework.data.redis.connection.lettuce.observability.RedisObservation.HighCardinalityCommandKeyNames;
import org.springframework.data.redis.connection.lettuce.observability.RedisObservation.LowCardinalityCommandKeyNames;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.tracing.Tracing.Endpoint;
import io.micrometer.common.KeyValues;
/**
* Default {@link LettuceObservationConvention} implementation.
*
* @author Mark Paluch
* @since 3.0
*/
record DefaultLettuceObservationConvention(
boolean includeCommandArgsInSpanTags) implements LettuceObservationConvention {
@Override
public KeyValues getLowCardinalityKeyValues(LettuceObservationContext context) {
Endpoint ep = context.getRequiredEndpoint();
KeyValues keyValues = KeyValues.of(LowCardinalityCommandKeyNames.DATABASE_SYSTEM.withValue("redis"), //
LowCardinalityCommandKeyNames.REDIS_COMMAND.withValue(context.getRequiredCommand().getType().name()));
if (ep instanceof SocketAddressEndpoint endpoint) {
if (endpoint.socketAddress()instanceof InetSocketAddress inet) {
keyValues = keyValues
.and(KeyValues.of(LowCardinalityCommandKeyNames.NET_SOCK_PEER_ADDR.withValue(inet.getHostString()),
LowCardinalityCommandKeyNames.NET_SOCK_PEER_PORT.withValue("" + inet.getPort()),
LowCardinalityCommandKeyNames.NET_TRANSPORT.withValue("IP.TCP")));
} else {
keyValues = keyValues
.and(KeyValues.of(LowCardinalityCommandKeyNames.NET_PEER_NAME.withValue(endpoint.toString()),
LowCardinalityCommandKeyNames.NET_TRANSPORT.withValue("Unix")));
}
}
return keyValues;
}
@Override
public KeyValues getHighCardinalityKeyValues(LettuceObservationContext context) {
RedisCommand<?, ?, ?> command = context.getRequiredCommand();
if (includeCommandArgsInSpanTags) {
if (command.getArgs() != null) {
return KeyValues.of(HighCardinalityCommandKeyNames.STATEMENT
.withValue(command.getType().name() + " " + command.getArgs().toCommandString()));
}
}
return KeyValues.empty();
}
@Override
public String getContextualName(LettuceObservationContext context) {
return context.getRequiredCommand().getType().name().toLowerCase(Locale.ROOT);
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import org.springframework.lang.Nullable;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.tracing.Tracing.Endpoint;
import io.micrometer.observation.Observation;
import io.micrometer.observation.transport.Kind;
import io.micrometer.observation.transport.SenderContext;
/**
* Micrometer {@link Observation.Context} holding Lettuce contextual details.
*
* @author Mark Paluch
* @since 3.0
*/
class LettuceObservationContext extends SenderContext<Object> {
private volatile @Nullable RedisCommand<?, ?, ?> command;
private volatile @Nullable Endpoint endpoint;
public LettuceObservationContext(String serviceName) {
super((carrier, key, value) -> {}, Kind.CLIENT);
setRemoteServiceName(serviceName);
}
public RedisCommand<?, ?, ?> getRequiredCommand() {
RedisCommand<?, ?, ?> local = command;
if (local == null) {
throw new IllegalArgumentException("LettuceObservationContext is not associated with a Command");
}
return local;
}
public void setCommand(RedisCommand<?, ?, ?> command) {
this.command = command;
}
public Endpoint getRequiredEndpoint() {
Endpoint local = endpoint;
if (local == null) {
throw new IllegalArgumentException("LettuceObservationContext is not associated with a Endpoint");
}
return local;
}
public void setEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationConvention;
/**
* {@link ObservationConvention} for {@link LettuceObservationContext}.
*
* @author Mark Paluch
* @since 3.0
*/
interface LettuceObservationConvention extends ObservationConvention<LettuceObservationContext> {
@Override
default boolean supportsContext(Observation.Context context) {
return context instanceof LettuceObservationContext;
}
}

View File

@@ -0,0 +1,335 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import java.net.SocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.data.redis.connection.lettuce.observability.RedisObservation.HighCardinalityCommandKeyNames;
import org.springframework.lang.Nullable;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.TraceContextProvider;
import io.lettuce.core.tracing.Tracer;
import io.lettuce.core.tracing.Tracer.Span;
import io.lettuce.core.tracing.TracerProvider;
import io.lettuce.core.tracing.Tracing;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import reactor.core.publisher.Mono;
/**
* {@link Tracing} adapter using Micrometer's {@link Observation}. This adapter integrates with Micrometer to propagate
* observations into timers, distributed traces and any other registered handlers. Observations include a set of tags
* capturing Redis runtime information.
* <h3>Capturing full statements</h3> This adapter can capture full statements when enabling
* {@code includeCommandArgsInSpanTags}. You should carefully consider the impact of this setting as all command
* arguments will be captured in traces including these that may contain sensitive details.
*
* @author Mark Paluch
* @since 3.0
*/
public class MicrometerTracingAdapter implements Tracing {
private static final Log log = LogFactory.getLog(MicrometerTracingAdapter.class);
private final ObservationRegistry observationRegistry;
private final String serviceName;
private final boolean includeCommandArgsInSpanTags;
private final LettuceObservationConvention observationConvention;
/**
* Create a new {@link MicrometerTracingAdapter} instance.
*
* @param observationRegistry must not be {@literal null}.
* @param serviceName service name to be used.
*/
public MicrometerTracingAdapter(ObservationRegistry observationRegistry, String serviceName) {
this(observationRegistry, serviceName, false);
}
/**
* Create a new {@link MicrometerTracingAdapter} instance.
*
* @param observationRegistry must not be {@literal null}.
* @param serviceName service name to be used.
* @param includeCommandArgsInSpanTags whether to attach the full command into the trace. Use this flag with caution
* as sensitive arguments will be captured in the observation spans and metric tags.
*/
public MicrometerTracingAdapter(ObservationRegistry observationRegistry, String serviceName,
boolean includeCommandArgsInSpanTags) {
this.observationRegistry = observationRegistry;
this.serviceName = serviceName;
this.observationConvention = new DefaultLettuceObservationConvention(includeCommandArgsInSpanTags);
this.includeCommandArgsInSpanTags = includeCommandArgsInSpanTags;
}
@Override
public TracerProvider getTracerProvider() {
return () -> new MicrometerTracer(observationRegistry);
}
@Override
public TraceContextProvider initialTraceContextProvider() {
return new MicrometerTraceContextProvider(observationRegistry);
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public boolean includeCommandArgsInSpanTags() {
return includeCommandArgsInSpanTags;
}
@Override
public Endpoint createEndpoint(SocketAddress socketAddress) {
return new SocketAddressEndpoint(socketAddress);
}
/**
* {@link Tracer} implementation based on Micrometer's {@link ObservationRegistry}.
*/
class MicrometerTracer extends Tracer {
private final ObservationRegistry observationRegistry;
public MicrometerTracer(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}
@Override
public Tracer.Span nextSpan() {
return this.postProcessSpan(createObservation());
}
@Override
public Tracer.Span nextSpan(TraceContext traceContext) {
if (traceContext instanceof MicrometerTraceContext micrometerTraceContext) {
return micrometerTraceContext.observation == null ? nextSpan()
: postProcessSpan(createObservation().parentObservation(micrometerTraceContext.observation()));
}
return nextSpan();
}
private Observation createObservation() {
return RedisObservation.REDIS_COMMAND_OBSERVATION.observation(observationRegistry,
() -> new LettuceObservationContext(serviceName));
}
private Tracer.Span postProcessSpan(Observation observation) {
return observation != null && !observation.isNoop()
? new MicrometerSpan(observation.observationConvention(observationConvention))
: NoOpSpan.INSTANCE;
}
}
/**
* No-op {@link Span} implemementation.
*/
static class NoOpSpan extends Tracer.Span {
static final NoOpSpan INSTANCE = new NoOpSpan();
public NoOpSpan() {}
@Override
public Tracer.Span start(RedisCommand<?, ?, ?> command) {
return this;
}
@Override
public Tracer.Span name(String name) {
return this;
}
@Override
public Tracer.Span annotate(String value) {
return this;
}
@Override
public Tracer.Span tag(String key, String value) {
return this;
}
@Override
public Tracer.Span error(Throwable throwable) {
return this;
}
@Override
public Tracer.Span remoteEndpoint(Tracing.Endpoint endpoint) {
return this;
}
@Override
public void finish() {}
}
/**
* Micrometer {@link Observation}-based {@link Span} implementation.
*/
static class MicrometerSpan extends Tracer.Span {
private final Observation observation;
private @Nullable RedisCommand<?, ?, ?> command;
public MicrometerSpan(Observation observation) {
this.observation = observation;
}
@Override
public Span start(RedisCommand<?, ?, ?> command) {
((LettuceObservationContext) observation.getContext()).setCommand(command);
this.command = command;
if (log.isDebugEnabled()) {
log.debug(String.format("Starting Observation for Command %s", command));
}
if (command instanceof CompleteableCommand<?> completeableCommand) {
completeableCommand.onComplete((o, throwable) -> {
if (command.getOutput() != null) {
String error = command.getOutput().getError();
if (error != null) {
observation.highCardinalityKeyValue(HighCardinalityCommandKeyNames.ERROR.withValue(error));
} else if (throwable != null) {
error(throwable);
}
}
finish();
});
} else {
throw new IllegalArgumentException("Command " + command
+ " must implement CompleteableCommand to attach Span completion to command completion");
}
observation.start();
return this;
}
@Override
public Span name(String name) {
return this;
}
@Override
public Span annotate(String annotation) {
return this;
}
@Override
public Span tag(String key, String value) {
observation.highCardinalityKeyValue(key, value);
return this;
}
@Override
public Span error(Throwable throwable) {
if (log.isDebugEnabled()) {
log.debug(String.format("Attaching error to Observation for Command %s", command));
}
observation.error(throwable);
return this;
}
@Override
public Span remoteEndpoint(Endpoint endpoint) {
((LettuceObservationContext) observation.getContext()).setEndpoint(endpoint);
return this;
}
@Override
public void finish() {
if (log.isDebugEnabled()) {
log.debug(String.format("Stopping Observation for Command %s", command));
}
observation.stop();
}
}
/**
* {@link TraceContextProvider} using {@link ObservationRegistry}.
*/
record MicrometerTraceContextProvider(ObservationRegistry registry) implements TraceContextProvider {
@Override
public TraceContext getTraceContext() {
Observation observation = registry.getCurrentObservation();
if (observation == null) {
return null;
}
return new MicrometerTraceContext(observation);
}
@Override
public Mono<TraceContext> getTraceContextLater() {
return Mono.deferContextual(Mono::justOrEmpty).filter((it) -> {
return it.hasKey(TraceContext.class) || it.hasKey(Observation.class)
|| it.hasKey(ObservationThreadLocalAccessor.KEY);
}).map((it) -> {
if (it.hasKey(Observation.class)) {
return new MicrometerTraceContext(it.get(Observation.class));
}
if (it.hasKey(TraceContext.class)) {
return it.get(TraceContext.class);
}
return new MicrometerTraceContext(it.get(ObservationThreadLocalAccessor.KEY));
});
}
}
/**
* {@link TraceContext} implementation using {@link Observation}.
*
* @param observation
*/
record MicrometerTraceContext(Observation observation) implements TraceContext {
}
}

View File

@@ -0,0 +1,172 @@
/*
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.docs.ObservationDocumentation;
/**
* A Redis-based {@link io.micrometer.observation.Observation}.
*
* @author Mark Paluch
* @since 3.0
*/
enum RedisObservation implements ObservationDocumentation {
/**
* Timer created around a Redis command execution.
*/
REDIS_COMMAND_OBSERVATION {
@Override
public String getName() {
return "spring.data.redis";
}
@Override
public KeyName[] getLowCardinalityKeyNames() {
return LowCardinalityCommandKeyNames.values();
}
@Override
public KeyName[] getHighCardinalityKeyNames() {
return HighCardinalityCommandKeyNames.values();
}
};
/**
* Enums related to low cardinality key names for Redis commands.
*/
enum LowCardinalityCommandKeyNames implements KeyName {
/**
* Database system.
*/
DATABASE_SYSTEM {
@Override
public String asString() {
return "db.system";
}
},
/**
* Network transport.
*/
NET_TRANSPORT {
@Override
public String asString() {
return "net.transport";
}
},
/**
* Name of the database host.
*/
NET_PEER_NAME {
@Override
public String asString() {
return "net.peer.name";
}
},
/**
* Logical remote port number.
*/
NET_PEER_PORT {
@Override
public String asString() {
return "net.peer.port";
}
},
/**
* Mongo peer address.
*/
NET_SOCK_PEER_ADDR {
@Override
public String asString() {
return "net.sock.peer.addr";
}
},
/**
* Mongo peer port.
*/
NET_SOCK_PEER_PORT {
@Override
public String asString() {
return "net.sock.peer.port";
}
},
/**
* Redis user.
*/
DB_USER {
@Override
public String asString() {
return "db.user";
}
},
/**
* Redis database index.
*/
DB_INDEX {
@Override
public String asString() {
return "db.redis.database_index";
}
},
/**
* Redis command value.
*/
REDIS_COMMAND {
@Override
public String asString() {
return "db.operation";
}
}
}
/**
* Enums related to high cardinality key names for Redis commands.
*/
enum HighCardinalityCommandKeyNames implements KeyName {
/**
* Redis statement.
*/
STATEMENT {
@Override
public String asString() {
return "db.statement";
}
},
/**
* Redis error response.
*/
ERROR {
@Override
public String asString() {
return "spring.data.redis.command.error";
}
}
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce.observability;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.lettuce.core.tracing.Tracing.Endpoint;
/**
* @author Mark Paluch
*/
record SocketAddressEndpoint(SocketAddress socketAddress) implements Endpoint {
@Override
public String toString() {
if (socketAddress instanceof InetSocketAddress inet) {
return inet.getHostString() + ":" + inet.getPort();
}
return socketAddress.toString();
}
}

View File

@@ -0,0 +1,6 @@
/**
* Integration of Micrometer Tracing for Lettuce Observability.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.data.redis.connection.lettuce.observability;