WIP
This commit is contained in:
@@ -75,31 +75,85 @@ class TraceCqlSessionInterceptor implements MethodInterceptor {
|
||||
if (isContextUnusable()) {
|
||||
return invocation.proceed();
|
||||
}
|
||||
if (method.getName().equals("execute")) {
|
||||
if (args.length > 0) {
|
||||
return tracedCall(createStatement(args), "execute", this.delegate::execute);
|
||||
if (method.getName().equals("execute") && args.length > 0) {
|
||||
return observe(createStatement(args), method.getName(), this.delegate::execute);
|
||||
}
|
||||
|
||||
if (method.getName().equals("executeAsync") && args.length > 0) {
|
||||
return observe(createStatement(args), method.getName(), this.delegate::executeAsync);
|
||||
}
|
||||
// prepare calls do not notify RequestTracker so we need to stop the observation
|
||||
// ourselves
|
||||
if (method.getName().equals("prepare") && args.length > 0) {
|
||||
|
||||
Statement<?> statement = createStatement(args);
|
||||
|
||||
if (TraceStatement.isTraceStatement(statement)) {
|
||||
return this.delegate.prepareAsync((SimpleStatement) statement);
|
||||
}
|
||||
|
||||
Span span = cassandraClientSpan();
|
||||
Statement<?> proxied = TraceStatement.createProxy(span, statement);
|
||||
((CassandraSpanCustomizer) proxied).customizeSpan("prepare");
|
||||
try {
|
||||
return this.delegate.prepare((SimpleStatement) proxied);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
span.error(e);
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
if (method.getName().equals("executeAsync")) {
|
||||
if (args.length > 0) {
|
||||
return tracedCall(createStatement(args), "executeAsync", this.delegate::executeAsync);
|
||||
}
|
||||
}
|
||||
if (method.getName().equals("prepare")) {
|
||||
if (args.length > 0) {
|
||||
return tracedCall(createStatement(args), "prepare",
|
||||
statement -> this.delegate.prepare((SimpleStatement) statement));
|
||||
}
|
||||
}
|
||||
if (method.getName().equals("prepareAsync")) {
|
||||
if (args.length > 0) {
|
||||
return tracedCall(createStatement(args), "prepareAsync",
|
||||
statement -> this.delegate.prepareAsync((SimpleStatement) statement));
|
||||
if (method.getName().equals("prepareAsync") && args.length > 0) {
|
||||
Statement<?> statement = createStatement(args);
|
||||
|
||||
if (TraceStatement.isTraceStatement(statement)) {
|
||||
return this.delegate.prepareAsync((SimpleStatement) statement);
|
||||
}
|
||||
|
||||
Span span = cassandraClientSpan();
|
||||
Statement<?> proxied = TraceStatement.createProxy(span, statement);
|
||||
((CassandraSpanCustomizer) proxied).customizeSpan("prepareAsync");
|
||||
|
||||
return this.delegate.prepareAsync((SimpleStatement) proxied)
|
||||
.whenComplete((preparedStatement, throwable) -> {
|
||||
|
||||
if (throwable != null) {
|
||||
span.error(throwable);
|
||||
}
|
||||
|
||||
span.end();
|
||||
});
|
||||
}
|
||||
return invocation.proceed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Observe a {@link Statement}.
|
||||
* @param statement original CQL {@link Statement}
|
||||
* @param statementExecutor function that transforms a {@link Statement} into a
|
||||
* resulting {@link Object}
|
||||
* @return the statement execution result.
|
||||
*/
|
||||
private Object observe(Statement<?> statement, String methodName,
|
||||
Function<Statement<?>, Object> statementExecutor) {
|
||||
// avoid duplicate statement wrapping
|
||||
if (TraceStatement.isTraceStatement(statement)) {
|
||||
return statementExecutor.apply(statement);
|
||||
}
|
||||
Statement<?> proxied = getStatement(statement, methodName);
|
||||
return statementExecutor.apply(proxied);
|
||||
}
|
||||
|
||||
private Statement<?> getStatement(Statement<?> statement, String methodName) {
|
||||
Span span = cassandraClientSpan();
|
||||
Statement<?> proxied = TraceStatement.createProxy(span, statement);
|
||||
((CassandraSpanCustomizer) proxied).customizeSpan(methodName);
|
||||
return proxied;
|
||||
}
|
||||
|
||||
private static Statement<?> createStatement(Object[] args) {
|
||||
if (args[0] instanceof Statement) {
|
||||
return (Statement<?>) (args[0]);
|
||||
@@ -119,17 +173,6 @@ class TraceCqlSessionInterceptor implements MethodInterceptor {
|
||||
return ContextUtil.isContextUnusable(this.beanFactory);
|
||||
}
|
||||
|
||||
private Object tracedCall(Statement<?> statement, String defaultSpanName, Function<Statement<?>, Object> function) {
|
||||
Span span = cassandraClientSpan();
|
||||
Statement<?> proxied = TraceStatement.isTraceStatement(statement) ? statement
|
||||
: TraceStatement.createProxy(span, statement);
|
||||
((CassandraSpanCustomizer) proxied).customizeSpan(defaultSpanName);
|
||||
try (CurrentTraceContext.Scope ws = currentTraceContext().maybeScope(span.context())) {
|
||||
log.debug("Will execute statement");
|
||||
return function.apply(proxied);
|
||||
}
|
||||
}
|
||||
|
||||
private Span cassandraClientSpan() {
|
||||
return cassandraClientSpan(tracer().spanBuilder(), getSessionName(), getKeyspace());
|
||||
}
|
||||
@@ -156,11 +199,4 @@ class TraceCqlSessionInterceptor implements MethodInterceptor {
|
||||
return this.tracer;
|
||||
}
|
||||
|
||||
private CurrentTraceContext currentTraceContext() {
|
||||
if (this.currentTraceContext == null) {
|
||||
this.currentTraceContext = this.beanFactory.getBean(CurrentTraceContext.class);
|
||||
}
|
||||
return this.currentTraceContext;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user