diff --git a/src/main/java/org/springframework/data/gemfire/transaction/GemfireTransactionManager.java b/src/main/java/org/springframework/data/gemfire/transaction/GemfireTransactionManager.java index 8c471e76..f8e8c892 100644 --- a/src/main/java/org/springframework/data/gemfire/transaction/GemfireTransactionManager.java +++ b/src/main/java/org/springframework/data/gemfire/transaction/GemfireTransactionManager.java @@ -12,9 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ - package org.springframework.data.gemfire.transaction; import static org.springframework.data.gemfire.transaction.GemfireTransactionManager.CacheHolder.newCacheHolder; @@ -89,8 +87,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage /** * Constructs an instance of the {@link GemfireTransactionManager}. */ - public GemfireTransactionManager() { - } + public GemfireTransactionManager() { } /** * Constructs an instance of the {@link GemfireTransactionManager} initialized with @@ -101,7 +98,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage * @see #afterPropertiesSet() */ public GemfireTransactionManager(GemFireCache cache) { + this.cache = cache; + afterPropertiesSet(); } @@ -110,7 +109,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override public void afterPropertiesSet() { + Assert.notNull(this.cache, "Cache is required"); + this.cache.setCopyOnRead(isCopyOnRead()); } @@ -136,8 +137,11 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + try { + CacheTransactionObject cacheTransaction = (CacheTransactionObject) transaction; + GemFireCache cache = getCache(); if (logger.isDebugEnabled()) { @@ -156,10 +160,10 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage cacheTransaction.setAndGetHolder(newCacheHolder(transactionId))); } } - catch (IllegalStateException e) { + catch (Exception cause) { throw new CannotCreateTransactionException(String.format("%1$s; %2$s", - "An existing, ongoing transaction is already associated with the current thread", - "are multiple transaction managers present?"), e); + "An existing, ongoing transaction is already associated with the current thread.", + " Are multiple transaction managers present?"), cause); } } @@ -168,6 +172,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + try { if (status.isDebug()) { logger.debug("Committing local cache transaction"); @@ -175,13 +180,13 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage getCacheTransactionManager().commit(); } - catch (IllegalStateException e) { - throw new NoTransactionException( - "No transaction is associated with the current thread; are multiple transaction managers present?", e); - } - catch (org.apache.geode.cache.TransactionException e) { + catch (org.apache.geode.cache.TransactionException cause) { throw new GemfireTransactionCommitException( - "Unexpected failure occurred on commit of local cache transaction", e); + "Unexpected failure occurred on commit of local cache transaction", cause); + } + catch (Exception cause) { + throw new NoTransactionException( + "No transaction is associated with the current thread; are multiple transaction managers present?", cause); } } @@ -190,6 +195,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override protected Object doSuspend(Object transaction) throws TransactionException { + if (getCacheTransactionManager().suspend() != null) { TransactionSynchronizationManager.unbindResource(getCache()); return ((CacheTransactionObject) transaction).setAndGetExistingHolder(null); @@ -203,13 +209,14 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override protected void doResume(Object transaction, Object suspendedResources) throws TransactionException { + if (suspendedResources instanceof CacheHolder) { + CacheHolder holder = (CacheHolder) suspendedResources; - boolean resumeSuccessful = (isResumeWaitTimeSet() - ? getCacheTransactionManager().tryResume(holder.getTransactionId(), - getResumeWaitTime(), getResumeWaitTimeUnit()) - : getCacheTransactionManager().tryResume(holder.getTransactionId())); + boolean resumeSuccessful = isResumeWaitTimeSet() + ? getCacheTransactionManager().tryResume(holder.getTransactionId(), getResumeWaitTime(), getResumeWaitTimeUnit()) + : getCacheTransactionManager().tryResume(holder.getTransactionId()); if (resumeSuccessful) { TransactionSynchronizationManager.bindResource(getCache(), @@ -223,6 +230,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage */ @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + try { if (status.isDebug()) { logger.debug("Rolling back local cache transaction"); @@ -230,9 +238,12 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage getCacheTransactionManager().rollback(); } - catch (IllegalStateException e) { - throw new NoTransactionException( - "No transaction is associated with the current thread; are multiple transaction managers present?", e); + catch (Exception cause) { + + String exceptionMessage = + "No transaction is associated with the current thread. Are multiple transaction managers present?"; + + throw new NoTransactionException(exceptionMessage, cause); } } @@ -311,7 +322,7 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage * @see #setCopyOnRead(boolean) */ public boolean isCopyOnRead() { - return copyOnRead; + return this.copyOnRead; } /** @@ -324,7 +335,9 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage * @see org.apache.geode.cache.Region */ public void setRegion(Region region) { + Assert.notNull(region, "Region must not be null"); + this.cache = (GemFireCache) region.getRegionService(); } @@ -365,8 +378,10 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage * @see #getResumeWaitTime() */ protected boolean isResumeWaitTimeSet() { + Long resumeWaitTime = getResumeWaitTime(); - return (resumeWaitTime != null && resumeWaitTime > 0); + + return resumeWaitTime != null && resumeWaitTime > 0; } /** @@ -401,40 +416,34 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage private CacheHolder cacheHolder; - /* (non-Javadoc) */ static CacheTransactionObject newCacheTransactionObject(CacheHolder cacheHolder) { CacheTransactionObject transactionObject = new CacheTransactionObject(); transactionObject.setHolder(cacheHolder); return transactionObject; } - /* (non-Javadoc) */ - boolean isHolding() { - return (getHolder() != null); - } - - /* (non-Javadoc) */ - CacheHolder getHolder() { - return this.cacheHolder; - } - - /* (non-Javadoc) */ - void setHolder(CacheHolder holder) { - this.cacheHolder = holder; - } - - /* (non-Javadoc) */ CacheHolder setAndGetExistingHolder(CacheHolder cacheHolder) { CacheHolder existingHolder = getHolder(); setHolder(cacheHolder); return existingHolder; } - /* (non-Javadoc) */ CacheHolder setAndGetHolder(CacheHolder holder) { setHolder(holder); return getHolder(); } + + void setHolder(CacheHolder cacheHolder) { + this.cacheHolder = cacheHolder; + } + + CacheHolder getHolder() { + return this.cacheHolder; + } + + boolean isHolding() { + return getHolder() != null; + } } /** @@ -446,24 +455,20 @@ public class GemfireTransactionManager extends AbstractPlatformTransactionManage private TransactionId transactionId; - /* (non-Javadoc) */ static CacheHolder newCacheHolder(TransactionId transactionId) { CacheHolder cacheHolder = new CacheHolder(); cacheHolder.transactionId = transactionId; return cacheHolder; } - /* (non-Javadoc) */ - boolean isRollbackOnly() { - return this.rollbackOnly; - } - - /* (non-Javadoc) */ void setRollbackOnly() { this.rollbackOnly = true; } - /* (non-Javadoc) */ + boolean isRollbackOnly() { + return this.rollbackOnly; + } + TransactionId getTransactionId() { return this.transactionId; } diff --git a/src/main/java/org/springframework/data/gemfire/transaction/TransactionApplicationEvent.java b/src/main/java/org/springframework/data/gemfire/transaction/TransactionApplicationEvent.java new file mode 100644 index 00000000..58493534 --- /dev/null +++ b/src/main/java/org/springframework/data/gemfire/transaction/TransactionApplicationEvent.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.gemfire.transaction; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Optional; + +import org.springframework.context.ApplicationEvent; +import org.springframework.util.StringUtils; + +/** + * The {@link TransactionApplicationEvent} is an implementation of {@link ApplicationEvent} which is fired during + * a transaction. + * + * @author John Blum + * @see java.time.Instant + * @see java.time.LocalDateTime + * @see org.springframework.context.ApplicationEvent + * @since 2.3.0 + */ +public class TransactionApplicationEvent extends ApplicationEvent { + + protected static final String TIMESTAMP_PATTERN = "yyyy-MM-dd-hh:mm:ss.S"; + + private String details; + + public TransactionApplicationEvent(Object source) { + this(source, null); + } + + public TransactionApplicationEvent(Object source, String details) { + + super(source); + + this.details = details; + } + + public Optional getDetails() { + return Optional.ofNullable(this.details).filter(StringUtils::hasText); + } + + public LocalDateTime getTimestampAsLocalDateTime() { + return LocalDateTime.from(Instant.ofEpochMilli(getTimestamp())); + } + + public String getTimestampAsString() { + return getTimestampAsString(TIMESTAMP_PATTERN); + } + + public String getTimestampAsString(String pattern) { + return getTimestampAsLocalDateTime().format(DateTimeFormatter.ofPattern(pattern)); + } + + @Override + public String toString() { + + return getDetails() + .map(details -> String.format("%s - %s", getTimestampAsString(), details)) + .orElse(String.format("%s[%s]", getClass().getSimpleName(), getTimestampAsString())); + } +} diff --git a/src/test/java/org/springframework/data/gemfire/transaction/TransactionalEventListenerIntegrationTests.java b/src/test/java/org/springframework/data/gemfire/transaction/TransactionalEventListenerIntegrationTests.java new file mode 100644 index 00000000..baa84886 --- /dev/null +++ b/src/test/java/org/springframework/data/gemfire/transaction/TransactionalEventListenerIntegrationTests.java @@ -0,0 +1,285 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.gemfire.transaction; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.data.gemfire.util.RuntimeExceptionFactory.newIllegalStateException; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.client.ClientRegionShortcut; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.data.annotation.Id; +import org.springframework.data.gemfire.config.annotation.ClientCacheApplication; +import org.springframework.data.gemfire.config.annotation.EnableEntityDefinedRegions; +import org.springframework.data.gemfire.mapping.GemfireMappingContext; +import org.springframework.data.gemfire.mapping.annotation.Region; +import org.springframework.data.gemfire.repository.support.GemfireRepositoryFactoryBean; +import org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions; +import org.springframework.data.repository.CrudRepository; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; +import org.springframework.util.Assert; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** + * Integration Tests for the Spring {@link TransactionalEventListener} in the context of Apache Geode + * cache transactions. + * + * @author John Blum + * @see org.junit.Test + * @see org.springframework.context.ApplicationEvent + * @see org.springframework.context.ApplicationEventPublisher + * @see org.springframework.context.annotation.Bean + * @see org.springframework.data.gemfire.config.annotation.ClientCacheApplication + * @see org.springframework.data.gemfire.transaction.TransactionApplicationEvent + * @see org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions + * @see org.springframework.test.context.ContextConfiguration + * @see org.springframework.test.context.junit4.SpringRunner + * @see org.springframework.transaction.annotation.Transactional + * @see org.springframework.transaction.event.TransactionPhase + * @see org.springframework.transaction.event.TransactionalEventListener + * @since 2.3.0 + */ +@RunWith(SpringRunner.class) +@ContextConfiguration +@SuppressWarnings("unused") +public class TransactionalEventListenerIntegrationTests { + + private static final String GEMFIRE_LOG_LEVEL = "error"; + + @Autowired + private CustomerService customerService; + + @Autowired + private TestTransactionEventListener transactionEventListener; + + @Test + public void successfulEntityTransactionTriggersCommitTransactionEvents() { + + Customer jonDoe = this.customerService.save(Customer.newCustomer(1L, "Jon Doe")); + + Customer jonDoeLoaded = this.customerService.findById(jonDoe.getId()); + + assertThat(jonDoeLoaded).isEqualTo(jonDoe); + + assertThat(this.transactionEventListener.getAndClearTransactionDetails()).containsExactly("1"); + + assertThat(this.transactionEventListener.getAndClearTransactionPhases()) + .containsExactly(TransactionPhase.BEFORE_COMMIT, TransactionPhase.AFTER_COMMIT); + } + + @Test(expected = IllegalStateException.class) + public void failingEntityTransactionTriggersRollbackTransactionEvent() { + + Customer janeDoe = Customer.newCustomer(2L, "Jane Doe"); + + try { + this.customerService.saveFailsAndRollsback(janeDoe); + } + catch (IllegalStateException expected) { + + assertThat(this.transactionEventListener.getAndClearTransactionDetails()).containsExactly("2"); + + assertThat(this.transactionEventListener.getAndClearTransactionPhases()) + .containsExactly(TransactionPhase.AFTER_ROLLBACK); + + assertThat(expected).hasMessage("TEST"); + assertThat(expected).hasNoCause(); + + try { + this.customerService.findById(janeDoe.getId()); + } + catch (IllegalStateException alsoExpected) { + + assertThat(alsoExpected).hasMessage("No Customer having ID [2] was found"); + assertThat(alsoExpected).hasNoCause(); + + throw alsoExpected; + } + } + } + + @ClientCacheApplication(logLevel = GEMFIRE_LOG_LEVEL) + @EnableEntityDefinedRegions( + basePackageClasses = Customer.class, + clientRegionShortcut = ClientRegionShortcut.LOCAL + ) + @EnableGemfireCacheTransactions + static class TestConfiguration { + + @Bean + GemfireRepositoryFactoryBean customerRepositoryFactoryBean() { + + GemfireRepositoryFactoryBean customerRepositoryFactoryBean + = new GemfireRepositoryFactoryBean<>(CustomerRepository.class); + + customerRepositoryFactoryBean.setGemfireMappingContext(new GemfireMappingContext()); + + return customerRepositoryFactoryBean; + } + + @Bean + CustomerService customerService(ApplicationEventPublisher eventPublisher, + CustomerRepository customerRepository) { + + return new CustomerService(eventPublisher, customerRepository); + } + + @Bean + TestTransactionEventListener testTransactionEventListener() { + return new TestTransactionEventListener(); + } + } + + @Data + @EqualsAndHashCode + @Region("Customers") + @RequiredArgsConstructor(staticName = "newCustomer") + static class Customer implements Serializable { + + @Id @NonNull + private Long id; + + @NonNull + private String name; + + } + + public interface CustomerRepository extends CrudRepository { } + + @Service + public static class CustomerService { + + private ApplicationEventPublisher eventPublisher; + + private final CustomerRepository customerRepository; + + public CustomerService(ApplicationEventPublisher eventPublisher, CustomerRepository customerRepository) { + + Assert.notNull(eventPublisher, "ApplicationEventPublisher is required"); + Assert.notNull(customerRepository, "CustomerRepository is required"); + + this.eventPublisher = eventPublisher; + this.customerRepository = customerRepository; + } + + public Customer findById(Long id) { + return this.customerRepository.findById(id) + .orElseThrow(() -> newIllegalStateException("No Customer having ID [%d] was found", id)); + } + + @Transactional + public Customer save(Customer customer) { + + customer = this.customerRepository.save(customer); + + this.eventPublisher.publishEvent(new TransactionApplicationEvent(this, + String.valueOf(customer.getId()))); + + return customer; + } + + @Transactional + public Customer saveFailsAndRollsback(Customer customer) { + + this.customerRepository.save(customer); + + this.eventPublisher.publishEvent(new TransactionApplicationEvent(this, + String.valueOf(customer.getId()))); + + throw newIllegalStateException("TEST"); + } + } + + @Component + public static class TestTransactionEventListener { + + private final List transactionPhases = new ArrayList<>(); + + private final Set transactionDetails = new HashSet<>(); + + public Set getAndClearTransactionDetails() { + + Set copy = new HashSet<>(this.transactionDetails); + + this.transactionDetails.clear(); + + return copy; + } + + public List getAndClearTransactionPhases() { + + List copy = new ArrayList<>(this.transactionPhases); + + this.transactionPhases.clear(); + + return copy; + } + + private void appendTransactionPhase(TransactionPhase transactionPhase) { + Optional.ofNullable(transactionPhase).ifPresent(this.transactionPhases::add); + } + + private void extractTransactionDetails(ApplicationEvent event) { + + Optional.ofNullable(event) + .filter(TransactionApplicationEvent.class::isInstance) + .map(TransactionApplicationEvent.class::cast) + .flatMap(TransactionApplicationEvent::getDetails) + .ifPresent(this.transactionDetails::add); + } + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void handleTransactionBeforeCommit(ApplicationEvent event) { + appendTransactionPhase(TransactionPhase.BEFORE_COMMIT); + extractTransactionDetails(event); + } + + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handleTransactionAfterCommit(ApplicationEvent event) { + appendTransactionPhase(TransactionPhase.AFTER_COMMIT); + extractTransactionDetails(event); + } + + @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) + public void handleTransactionAfterRollback(ApplicationEvent event) { + appendTransactionPhase(TransactionPhase.AFTER_ROLLBACK); + extractTransactionDetails(event); + } + } +}