Propagate SecurityContext into @Transactional methods. (#1979)
Closes #1944.
This commit is contained in:
@@ -18,6 +18,7 @@ package org.springframework.data.couchbase.transaction;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -89,6 +90,7 @@ public class CouchbaseCallbackTransactionManager implements CallbackPreferringPl
|
||||
@Stability.Internal
|
||||
<T> Flux<T> executeReactive(TransactionDefinition definition,
|
||||
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
|
||||
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext()); // caller's resources
|
||||
return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> {
|
||||
boolean isInExistingTransaction = isInTransaction.isPresent();
|
||||
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
|
||||
@@ -100,17 +102,20 @@ public class CouchbaseCallbackTransactionManager implements CallbackPreferringPl
|
||||
} else {
|
||||
return Mono.error(new UnsupportedOperationException("Unsupported operation"));
|
||||
}
|
||||
});
|
||||
}).contextWrite( // set CouchbaseResourceHolder containing caller's SecurityContext
|
||||
ctx -> ctx.put(CouchbaseResourceHolder.class, couchbaseResourceHolder));
|
||||
}
|
||||
|
||||
private <T> T executeNewTransaction(TransactionCallback<T> callback) {
|
||||
final AtomicReference<T> execResult = new AtomicReference<>();
|
||||
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext());
|
||||
|
||||
// Each of these transactions will block one thread on the underlying SDK's transactions scheduler. This
|
||||
// scheduler is effectively unlimited, but this can still potentially lead to high thread usage by the application.
|
||||
// If this is an issue then users need to instead use the standard Couchbase reactive transactions SDK.
|
||||
try {
|
||||
TransactionResult ignored = couchbaseClientFactory.getCluster().transactions().run(ctx -> {
|
||||
setSecurityContext(couchbaseResourceHolder.getSecurityContext()); // set the security context for the transaction
|
||||
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(ctx, true, false, false, true, null);
|
||||
|
||||
T res = callback.doInTransaction(status);
|
||||
@@ -173,12 +178,16 @@ public class CouchbaseCallbackTransactionManager implements CallbackPreferringPl
|
||||
}
|
||||
};
|
||||
|
||||
return Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v)).then(Mono.defer(() -> {
|
||||
if (status.isRollbackOnly()) {
|
||||
return Mono.error(new TransactionRollbackRequestedException("TransactionStatus.isRollbackOnly() is set"));
|
||||
}
|
||||
return Mono.empty();
|
||||
}));
|
||||
// Get caller's resources, set SecurityContext for the transaction
|
||||
return CouchbaseResourceOwner.get().map(cbrh -> setSecurityContext(cbrh.get().getSecurityContext()))
|
||||
.flatMap(ignore -> Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v))
|
||||
.then(Mono.defer(() -> {
|
||||
if (status.isRollbackOnly()) {
|
||||
return Mono.error(new TransactionRollbackRequestedException(
|
||||
"TransactionStatus.isRollbackOnly() is set"));
|
||||
}
|
||||
return Mono.empty();
|
||||
})));
|
||||
});
|
||||
|
||||
}, this.options).thenMany(Flux.defer(() -> Flux.fromIterable(out))).onErrorMap(ex -> {
|
||||
@@ -288,4 +297,26 @@ public class CouchbaseCallbackTransactionManager implements CallbackPreferringPl
|
||||
throw new UnsupportedOperationException(
|
||||
"Direct programmatic use of the Couchbase PlatformTransactionManager is not supported");
|
||||
}
|
||||
|
||||
static private Object getSecurityContext() {
|
||||
try {
|
||||
Class<?> securityContextHolderClass = Class
|
||||
.forName("org.springframework.security.core.context.SecurityContextHolder");
|
||||
return securityContextHolderClass.getMethod("getContext").invoke(null);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
|
||||
| InvocationTargetException cnfe) {}
|
||||
return null;
|
||||
}
|
||||
|
||||
static private <S> S setSecurityContext(S sc) {
|
||||
try {
|
||||
Class<?> securityContextHolder = Class.forName("org.springframework.security.core.context.SecurityContext");
|
||||
Class<?> securityContextHolderClass = Class
|
||||
.forName("org.springframework.security.core.context.SecurityContextHolder");
|
||||
securityContextHolderClass.getMethod("setContext", new Class[] { securityContextHolder }).invoke(null, sc);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
|
||||
| InvocationTargetException cnfe) {}
|
||||
return sc;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
|
||||
public class CouchbaseResourceHolder extends ResourceHolderSupport {
|
||||
|
||||
private @Nullable CoreTransactionAttemptContext core; // which holds the atr
|
||||
private @Nullable Object securityContext; // SecurityContext. We don't have the class.
|
||||
|
||||
Map<Integer, Object> getResultMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
@@ -42,7 +44,17 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport {
|
||||
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
|
||||
*/
|
||||
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core) {
|
||||
this(core, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link CouchbaseResourceHolder} for a given {@link CoreTransactionAttemptContext session}.
|
||||
*
|
||||
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
|
||||
*/
|
||||
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core, @Nullable Object securityContext) {
|
||||
this.core = core;
|
||||
this.securityContext = securityContext;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -53,6 +65,14 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport {
|
||||
return core;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
|
||||
*/
|
||||
@Nullable
|
||||
public Object getSecurityContext() {
|
||||
return securityContext;
|
||||
}
|
||||
|
||||
public Object transactionResultHolder(Object holder, Object o) {
|
||||
getResultMap.put(System.identityHashCode(o), holder);
|
||||
return holder;
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package org.springframework.data.couchbase.transaction;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.couchbase.client.core.annotation.Stability.Internal;
|
||||
|
||||
@Internal
|
||||
public class CouchbaseResourceOwner {
|
||||
private static final ThreadLocal<CouchbaseResourceHolder> marker = new ThreadLocal();
|
||||
|
||||
public CouchbaseResourceOwner() {}
|
||||
|
||||
public static void set(CouchbaseResourceHolder toInject) {
|
||||
if (marker.get() != null) {
|
||||
throw new IllegalStateException(
|
||||
"Trying to set resource holder when already inside a transaction - likely an internal bug, please report it");
|
||||
} else {
|
||||
marker.set(toInject);
|
||||
}
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
marker.remove();
|
||||
}
|
||||
|
||||
public static Mono<Optional<CouchbaseResourceHolder>> get() {
|
||||
return Mono.deferContextual((ctx) -> {
|
||||
CouchbaseResourceHolder fromThreadLocal = marker.get();
|
||||
CouchbaseResourceHolder fromReactive = ctx.hasKey(CouchbaseResourceHolder.class)
|
||||
? ctx.get(CouchbaseResourceHolder.class)
|
||||
: null;
|
||||
if (fromThreadLocal != null) {
|
||||
return Mono.just(Optional.of(fromThreadLocal));
|
||||
} else {
|
||||
return fromReactive != null ? Mono.just(Optional.of(fromReactive)) : Mono.just(Optional.empty());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ package org.springframework.data.couchbase.transactions;
|
||||
|
||||
import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;
|
||||
|
||||
import org.springframework.data.couchbase.util.Util;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
@@ -119,6 +120,7 @@ public class CouchbasePersonTransactionReactiveIntegrationTests extends JavaInte
|
||||
@Test
|
||||
public void commitShouldPersistTxEntries() {
|
||||
|
||||
System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
|
||||
personService.savePerson(WalterWhite) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNextCount(1) //
|
||||
@@ -130,6 +132,17 @@ public class CouchbasePersonTransactionReactiveIntegrationTests extends JavaInte
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void commitShouldPersistTxEntriesBlocking() {
|
||||
System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
|
||||
Person p = personService.savePersonBlocking(WalterWhite);
|
||||
|
||||
operations.findByQuery(Person.class).withConsistency(REQUEST_PLUS).count() //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(1L) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() {
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import org.springframework.data.couchbase.core.TransactionalSupport;
|
||||
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
|
||||
import org.springframework.data.couchbase.transaction.CouchbaseResourceHolder;
|
||||
import org.springframework.data.couchbase.util.Util;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -31,6 +33,9 @@ import org.springframework.data.couchbase.domain.Person;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.reactive.TransactionalOperator;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* reactive PersonService for tests
|
||||
*
|
||||
@@ -57,8 +62,15 @@ class PersonServiceReactive {
|
||||
.<Person> flatMap(it -> Mono.error(new SimulateFailureException()));
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Person savePersonBlocking(Person person) {
|
||||
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
|
||||
return personOperations.insertById(Person.class).one(person);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Person> savePerson(Person person) {
|
||||
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
|
||||
return TransactionalSupport.checkForTransactionInThreadLocalStorage().map(stat -> {
|
||||
assertTrue(stat.isPresent(), "Not in transaction");
|
||||
System.err.println("In a transaction!!");
|
||||
|
||||
@@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.awaitility.Awaitility.with;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
@@ -170,4 +171,26 @@ public class Util {
|
||||
+ " but expected in-annotation-transaction = " + inTransaction);
|
||||
}
|
||||
|
||||
static public Object getSecurityContext(){
|
||||
Object sc = null;
|
||||
try {
|
||||
Class<?> securityContextHolderClass = Class
|
||||
.forName("org.springframework.security.core.context.SecurityContextHolder");
|
||||
sc = securityContextHolderClass.getMethod("getContext").invoke(null);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
|
||||
| InvocationTargetException cnfe) {}
|
||||
System.err.println(Thread.currentThread().getName() +" Util.get "+ System.identityHashCode(sc));
|
||||
return sc;
|
||||
}
|
||||
|
||||
static public void setSecurityContext(Object sc) {
|
||||
System.err.println(Thread.currentThread().getName() +" Util.set "+ System.identityHashCode(sc));
|
||||
try {
|
||||
Class<?> securityContextHolderClass = Class
|
||||
.forName("org.springframework.security.core.context.SecurityContextHolder");
|
||||
sc = securityContextHolderClass.getMethod("setContext", new Class[]{securityContextHolderClass}).invoke(sc);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
|
||||
| InvocationTargetException cnfe) {}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user