From 7fcfd2654a1b9f81ac87b29ce91a60cbbc52bc9f Mon Sep 17 00:00:00 2001 From: Marcin Grzejszczak Date: Tue, 27 Jun 2023 16:24:05 +0200 Subject: [PATCH] WIP --- .../cassandra/TraceCqlSessionInterceptor.java | 108 ++++++++++++------ 1 file changed, 72 insertions(+), 36 deletions(-) diff --git a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/cassandra/TraceCqlSessionInterceptor.java b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/cassandra/TraceCqlSessionInterceptor.java index d06972e37..79630dafc 100644 --- a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/cassandra/TraceCqlSessionInterceptor.java +++ b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/cassandra/TraceCqlSessionInterceptor.java @@ -75,31 +75,85 @@ class TraceCqlSessionInterceptor implements MethodInterceptor { if (isContextUnusable()) { return invocation.proceed(); } - if (method.getName().equals("execute")) { - if (args.length > 0) { - return tracedCall(createStatement(args), "execute", this.delegate::execute); + if (method.getName().equals("execute") && args.length > 0) { + return observe(createStatement(args), method.getName(), this.delegate::execute); + } + + if (method.getName().equals("executeAsync") && args.length > 0) { + return observe(createStatement(args), method.getName(), this.delegate::executeAsync); + } + // prepare calls do not notify RequestTracker so we need to stop the observation + // ourselves + if (method.getName().equals("prepare") && args.length > 0) { + + Statement statement = createStatement(args); + + if (TraceStatement.isTraceStatement(statement)) { + return this.delegate.prepareAsync((SimpleStatement) statement); + } + + Span span = cassandraClientSpan(); + Statement proxied = TraceStatement.createProxy(span, statement); + ((CassandraSpanCustomizer) proxied).customizeSpan("prepare"); + try { + return this.delegate.prepare((SimpleStatement) proxied); + } + catch (RuntimeException e) { + span.error(e); + throw e; + } + finally { + span.end(); } } - if (method.getName().equals("executeAsync")) { - if (args.length > 0) { - return tracedCall(createStatement(args), "executeAsync", this.delegate::executeAsync); - } - } - if (method.getName().equals("prepare")) { - if (args.length > 0) { - return tracedCall(createStatement(args), "prepare", - statement -> this.delegate.prepare((SimpleStatement) statement)); - } - } - if (method.getName().equals("prepareAsync")) { - if (args.length > 0) { - return tracedCall(createStatement(args), "prepareAsync", - statement -> this.delegate.prepareAsync((SimpleStatement) statement)); + if (method.getName().equals("prepareAsync") && args.length > 0) { + Statement statement = createStatement(args); + + if (TraceStatement.isTraceStatement(statement)) { + return this.delegate.prepareAsync((SimpleStatement) statement); } + + Span span = cassandraClientSpan(); + Statement proxied = TraceStatement.createProxy(span, statement); + ((CassandraSpanCustomizer) proxied).customizeSpan("prepareAsync"); + + return this.delegate.prepareAsync((SimpleStatement) proxied) + .whenComplete((preparedStatement, throwable) -> { + + if (throwable != null) { + span.error(throwable); + } + + span.end(); + }); } return invocation.proceed(); } + /** + * Observe a {@link Statement}. + * @param statement original CQL {@link Statement} + * @param statementExecutor function that transforms a {@link Statement} into a + * resulting {@link Object} + * @return the statement execution result. + */ + private Object observe(Statement statement, String methodName, + Function, Object> statementExecutor) { + // avoid duplicate statement wrapping + if (TraceStatement.isTraceStatement(statement)) { + return statementExecutor.apply(statement); + } + Statement proxied = getStatement(statement, methodName); + return statementExecutor.apply(proxied); + } + + private Statement getStatement(Statement statement, String methodName) { + Span span = cassandraClientSpan(); + Statement proxied = TraceStatement.createProxy(span, statement); + ((CassandraSpanCustomizer) proxied).customizeSpan(methodName); + return proxied; + } + private static Statement createStatement(Object[] args) { if (args[0] instanceof Statement) { return (Statement) (args[0]); @@ -119,17 +173,6 @@ class TraceCqlSessionInterceptor implements MethodInterceptor { return ContextUtil.isContextUnusable(this.beanFactory); } - private Object tracedCall(Statement statement, String defaultSpanName, Function, Object> function) { - Span span = cassandraClientSpan(); - Statement proxied = TraceStatement.isTraceStatement(statement) ? statement - : TraceStatement.createProxy(span, statement); - ((CassandraSpanCustomizer) proxied).customizeSpan(defaultSpanName); - try (CurrentTraceContext.Scope ws = currentTraceContext().maybeScope(span.context())) { - log.debug("Will execute statement"); - return function.apply(proxied); - } - } - private Span cassandraClientSpan() { return cassandraClientSpan(tracer().spanBuilder(), getSessionName(), getKeyspace()); } @@ -156,11 +199,4 @@ class TraceCqlSessionInterceptor implements MethodInterceptor { return this.tracer; } - private CurrentTraceContext currentTraceContext() { - if (this.currentTraceContext == null) { - this.currentTraceContext = this.beanFactory.getBean(CurrentTraceContext.class); - } - return this.currentTraceContext; - } - }