SGF-894 - Add tests for Spring @TransactionalEventListener annotated POJO methods in the context of Apache Geode, Local Cache Transactions.

This commit is contained in:
John Blum
2019-11-11 19:50:58 -08:00
parent 7f8022f2f2
commit 7ab246846c
3 changed files with 414 additions and 48 deletions

View File

@@ -12,9 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.data.gemfire.transaction;
import static org.springframework.data.gemfire.transaction.GemfireTransactionManager.CacheHolder.newCacheHolder;
@@ -89,8 +87,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
/**
* Constructs an instance of the {@link GemfireTransactionManager}.
*/
public GemfireTransactionManager() {
}
public GemfireTransactionManager() { }
/**
* Constructs an instance of the {@link GemfireTransactionManager} initialized with
@@ -101,7 +98,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
* @see #afterPropertiesSet()
*/
public GemfireTransactionManager(GemFireCache cache) {
this.cache = cache;
afterPropertiesSet();
}
@@ -110,7 +109,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
public void afterPropertiesSet() {
Assert.notNull(this.cache, "Cache is required");
this.cache.setCopyOnRead(isCopyOnRead());
}
@@ -136,8 +137,11 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
try {
CacheTransactionObject cacheTransaction = (CacheTransactionObject) transaction;
GemFireCache cache = getCache();
if (logger.isDebugEnabled()) {
@@ -156,10 +160,10 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
cacheTransaction.setAndGetHolder(newCacheHolder(transactionId)));
}
}
catch (IllegalStateException e) {
catch (Exception cause) {
throw new CannotCreateTransactionException(String.format("%1$s; %2$s",
"An existing, ongoing transaction is already associated with the current thread",
"are multiple transaction managers present?"), e);
"An existing, ongoing transaction is already associated with the current thread.",
" Are multiple transaction managers present?"), cause);
}
}
@@ -168,6 +172,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
try {
if (status.isDebug()) {
logger.debug("Committing local cache transaction");
@@ -175,13 +180,13 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
getCacheTransactionManager().commit();
}
catch (IllegalStateException e) {
throw new NoTransactionException(
"No transaction is associated with the current thread; are multiple transaction managers present?", e);
}
catch (org.apache.geode.cache.TransactionException e) {
catch (org.apache.geode.cache.TransactionException cause) {
throw new GemfireTransactionCommitException(
"Unexpected failure occurred on commit of local cache transaction", e);
"Unexpected failure occurred on commit of local cache transaction", cause);
}
catch (Exception cause) {
throw new NoTransactionException(
"No transaction is associated with the current thread; are multiple transaction managers present?", cause);
}
}
@@ -190,6 +195,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
protected Object doSuspend(Object transaction) throws TransactionException {
if (getCacheTransactionManager().suspend() != null) {
TransactionSynchronizationManager.unbindResource(getCache());
return ((CacheTransactionObject) transaction).setAndGetExistingHolder(null);
@@ -203,13 +209,14 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
protected void doResume(Object transaction, Object suspendedResources) throws TransactionException {
if (suspendedResources instanceof CacheHolder) {
CacheHolder holder = (CacheHolder) suspendedResources;
boolean resumeSuccessful = (isResumeWaitTimeSet()
? getCacheTransactionManager().tryResume(holder.getTransactionId(),
getResumeWaitTime(), getResumeWaitTimeUnit())
: getCacheTransactionManager().tryResume(holder.getTransactionId()));
boolean resumeSuccessful = isResumeWaitTimeSet()
? getCacheTransactionManager().tryResume(holder.getTransactionId(), getResumeWaitTime(), getResumeWaitTimeUnit())
: getCacheTransactionManager().tryResume(holder.getTransactionId());
if (resumeSuccessful) {
TransactionSynchronizationManager.bindResource(getCache(),
@@ -223,6 +230,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
*/
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
try {
if (status.isDebug()) {
logger.debug("Rolling back local cache transaction");
@@ -230,9 +238,12 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
getCacheTransactionManager().rollback();
}
catch (IllegalStateException e) {
throw new NoTransactionException(
"No transaction is associated with the current thread; are multiple transaction managers present?", e);
catch (Exception cause) {
String exceptionMessage =
"No transaction is associated with the current thread. Are multiple transaction managers present?";
throw new NoTransactionException(exceptionMessage, cause);
}
}
@@ -311,7 +322,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
* @see #setCopyOnRead(boolean)
*/
public boolean isCopyOnRead() {
return copyOnRead;
return this.copyOnRead;
}
/**
@@ -324,7 +335,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
* @see org.apache.geode.cache.Region
*/
public <K, V> void setRegion(Region<K, V> region) {
Assert.notNull(region, "Region must not be null");
this.cache = (GemFireCache) region.getRegionService();
}
@@ -365,8 +378,10 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
* @see #getResumeWaitTime()
*/
protected boolean isResumeWaitTimeSet() {
Long resumeWaitTime = getResumeWaitTime();
return (resumeWaitTime != null && resumeWaitTime > 0);
return resumeWaitTime != null && resumeWaitTime > 0;
}
/**
@@ -401,40 +416,34 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
private CacheHolder cacheHolder;
/* (non-Javadoc) */
static CacheTransactionObject newCacheTransactionObject(CacheHolder cacheHolder) {
CacheTransactionObject transactionObject = new CacheTransactionObject();
transactionObject.setHolder(cacheHolder);
return transactionObject;
}
/* (non-Javadoc) */
boolean isHolding() {
return (getHolder() != null);
}
/* (non-Javadoc) */
CacheHolder getHolder() {
return this.cacheHolder;
}
/* (non-Javadoc) */
void setHolder(CacheHolder holder) {
this.cacheHolder = holder;
}
/* (non-Javadoc) */
CacheHolder setAndGetExistingHolder(CacheHolder cacheHolder) {
CacheHolder existingHolder = getHolder();
setHolder(cacheHolder);
return existingHolder;
}
/* (non-Javadoc) */
CacheHolder setAndGetHolder(CacheHolder holder) {
setHolder(holder);
return getHolder();
}
void setHolder(CacheHolder cacheHolder) {
this.cacheHolder = cacheHolder;
}
CacheHolder getHolder() {
return this.cacheHolder;
}
boolean isHolding() {
return getHolder() != null;
}
}
/**
@@ -446,24 +455,20 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage
private TransactionId transactionId;
/* (non-Javadoc) */
static CacheHolder newCacheHolder(TransactionId transactionId) {
CacheHolder cacheHolder = new CacheHolder();
cacheHolder.transactionId = transactionId;
return cacheHolder;
}
/* (non-Javadoc) */
boolean isRollbackOnly() {
return this.rollbackOnly;
}
/* (non-Javadoc) */
void setRollbackOnly() {
this.rollbackOnly = true;
}
/* (non-Javadoc) */
boolean isRollbackOnly() {
return this.rollbackOnly;
}
TransactionId getTransactionId() {
return this.transactionId;
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.gemfire.transaction;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.StringUtils;
/**
* The {@link TransactionApplicationEvent} is an implementation of {@link ApplicationEvent} which is fired during
* a transaction.
*
* @author John Blum
* @see java.time.Instant
* @see java.time.LocalDateTime
* @see org.springframework.context.ApplicationEvent
* @since 2.3.0
*/
public class TransactionApplicationEvent extends ApplicationEvent {
protected static final String TIMESTAMP_PATTERN = "yyyy-MM-dd-hh:mm:ss.S";
private String details;
public TransactionApplicationEvent(Object source) {
this(source, null);
}
public TransactionApplicationEvent(Object source, String details) {
super(source);
this.details = details;
}
public Optional<String> getDetails() {
return Optional.ofNullable(this.details).filter(StringUtils::hasText);
}
public LocalDateTime getTimestampAsLocalDateTime() {
return LocalDateTime.from(Instant.ofEpochMilli(getTimestamp()));
}
public String getTimestampAsString() {
return getTimestampAsString(TIMESTAMP_PATTERN);
}
public String getTimestampAsString(String pattern) {
return getTimestampAsLocalDateTime().format(DateTimeFormatter.ofPattern(pattern));
}
@Override
public String toString() {
return getDetails()
.map(details -> String.format("%s - %s", getTimestampAsString(), details))
.orElse(String.format("%s[%s]", getClass().getSimpleName(), getTimestampAsString()));
}
}

View File

@@ -0,0 +1,285 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.gemfire.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.data.gemfire.util.RuntimeExceptionFactory.newIllegalStateException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
import org.springframework.data.gemfire.config.annotation.EnableEntityDefinedRegions;
import org.springframework.data.gemfire.mapping.GemfireMappingContext;
import org.springframework.data.gemfire.mapping.annotation.Region;
import org.springframework.data.gemfire.repository.support.GemfireRepositoryFactoryBean;
import org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.util.Assert;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
/**
* Integration Tests for the Spring {@link TransactionalEventListener} in the context of Apache Geode
* cache transactions.
*
* @author John Blum
* @see org.junit.Test
* @see org.springframework.context.ApplicationEvent
* @see org.springframework.context.ApplicationEventPublisher
* @see org.springframework.context.annotation.Bean
* @see org.springframework.data.gemfire.config.annotation.ClientCacheApplication
* @see org.springframework.data.gemfire.transaction.TransactionApplicationEvent
* @see org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions
* @see org.springframework.test.context.ContextConfiguration
* @see org.springframework.test.context.junit4.SpringRunner
* @see org.springframework.transaction.annotation.Transactional
* @see org.springframework.transaction.event.TransactionPhase
* @see org.springframework.transaction.event.TransactionalEventListener
* @since 2.3.0
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@SuppressWarnings("unused")
public class TransactionalEventListenerIntegrationTests {
private static final String GEMFIRE_LOG_LEVEL = "error";
@Autowired
private CustomerService customerService;
@Autowired
private TestTransactionEventListener transactionEventListener;
@Test
public void successfulEntityTransactionTriggersCommitTransactionEvents() {
Customer jonDoe = this.customerService.save(Customer.newCustomer(1L, "Jon Doe"));
Customer jonDoeLoaded = this.customerService.findById(jonDoe.getId());
assertThat(jonDoeLoaded).isEqualTo(jonDoe);
assertThat(this.transactionEventListener.getAndClearTransactionDetails()).containsExactly("1");
assertThat(this.transactionEventListener.getAndClearTransactionPhases())
.containsExactly(TransactionPhase.BEFORE_COMMIT, TransactionPhase.AFTER_COMMIT);
}
@Test(expected = IllegalStateException.class)
public void failingEntityTransactionTriggersRollbackTransactionEvent() {
Customer janeDoe = Customer.newCustomer(2L, "Jane Doe");
try {
this.customerService.saveFailsAndRollsback(janeDoe);
}
catch (IllegalStateException expected) {
assertThat(this.transactionEventListener.getAndClearTransactionDetails()).containsExactly("2");
assertThat(this.transactionEventListener.getAndClearTransactionPhases())
.containsExactly(TransactionPhase.AFTER_ROLLBACK);
assertThat(expected).hasMessage("TEST");
assertThat(expected).hasNoCause();
try {
this.customerService.findById(janeDoe.getId());
}
catch (IllegalStateException alsoExpected) {
assertThat(alsoExpected).hasMessage("No Customer having ID [2] was found");
assertThat(alsoExpected).hasNoCause();
throw alsoExpected;
}
}
}
@ClientCacheApplication(logLevel = GEMFIRE_LOG_LEVEL)
@EnableEntityDefinedRegions(
basePackageClasses = Customer.class,
clientRegionShortcut = ClientRegionShortcut.LOCAL
)
@EnableGemfireCacheTransactions
static class TestConfiguration {
@Bean
GemfireRepositoryFactoryBean<CustomerRepository, Customer, Long> customerRepositoryFactoryBean() {
GemfireRepositoryFactoryBean<CustomerRepository, Customer, Long> customerRepositoryFactoryBean
= new GemfireRepositoryFactoryBean<>(CustomerRepository.class);
customerRepositoryFactoryBean.setGemfireMappingContext(new GemfireMappingContext());
return customerRepositoryFactoryBean;
}
@Bean
CustomerService customerService(ApplicationEventPublisher eventPublisher,
CustomerRepository customerRepository) {
return new CustomerService(eventPublisher, customerRepository);
}
@Bean
TestTransactionEventListener testTransactionEventListener() {
return new TestTransactionEventListener();
}
}
@Data
@EqualsAndHashCode
@Region("Customers")
@RequiredArgsConstructor(staticName = "newCustomer")
static class Customer implements Serializable {
@Id @NonNull
private Long id;
@NonNull
private String name;
}
public interface CustomerRepository extends CrudRepository<Customer, Long> { }
@Service
public static class CustomerService {
private ApplicationEventPublisher eventPublisher;
private final CustomerRepository customerRepository;
public CustomerService(ApplicationEventPublisher eventPublisher, CustomerRepository customerRepository) {
Assert.notNull(eventPublisher, "ApplicationEventPublisher is required");
Assert.notNull(customerRepository, "CustomerRepository is required");
this.eventPublisher = eventPublisher;
this.customerRepository = customerRepository;
}
public Customer findById(Long id) {
return this.customerRepository.findById(id)
.orElseThrow(() -> newIllegalStateException("No Customer having ID [%d] was found", id));
}
@Transactional
public Customer save(Customer customer) {
customer = this.customerRepository.save(customer);
this.eventPublisher.publishEvent(new TransactionApplicationEvent(this,
String.valueOf(customer.getId())));
return customer;
}
@Transactional
public Customer saveFailsAndRollsback(Customer customer) {
this.customerRepository.save(customer);
this.eventPublisher.publishEvent(new TransactionApplicationEvent(this,
String.valueOf(customer.getId())));
throw newIllegalStateException("TEST");
}
}
@Component
public static class TestTransactionEventListener {
private final List<TransactionPhase> transactionPhases = new ArrayList<>();
private final Set<String> transactionDetails = new HashSet<>();
public Set<String> getAndClearTransactionDetails() {
Set<String> copy = new HashSet<>(this.transactionDetails);
this.transactionDetails.clear();
return copy;
}
public List<TransactionPhase> getAndClearTransactionPhases() {
List<TransactionPhase> copy = new ArrayList<>(this.transactionPhases);
this.transactionPhases.clear();
return copy;
}
private void appendTransactionPhase(TransactionPhase transactionPhase) {
Optional.ofNullable(transactionPhase).ifPresent(this.transactionPhases::add);
}
private void extractTransactionDetails(ApplicationEvent event) {
Optional.ofNullable(event)
.filter(TransactionApplicationEvent.class::isInstance)
.map(TransactionApplicationEvent.class::cast)
.flatMap(TransactionApplicationEvent::getDetails)
.ifPresent(this.transactionDetails::add);
}
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handleTransactionBeforeCommit(ApplicationEvent event) {
appendTransactionPhase(TransactionPhase.BEFORE_COMMIT);
extractTransactionDetails(event);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleTransactionAfterCommit(ApplicationEvent event) {
appendTransactionPhase(TransactionPhase.AFTER_COMMIT);
extractTransactionDetails(event);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleTransactionAfterRollback(ApplicationEvent event) {
appendTransactionPhase(TransactionPhase.AFTER_ROLLBACK);
extractTransactionDetails(event);
}
}
}