DATAGEODE-110 - Move client Region interests registration to on SmartLifecycle start.
This commit is contained in:
@@ -54,6 +54,7 @@ import org.springframework.data.gemfire.config.annotation.RegionConfigurer;
|
||||
import org.springframework.data.gemfire.config.xml.GemfireConstants;
|
||||
import org.springframework.data.gemfire.eviction.EvictingRegionFactoryBean;
|
||||
import org.springframework.data.gemfire.expiration.ExpiringRegionFactoryBean;
|
||||
import org.springframework.data.gemfire.support.SmartLifecycleSupport;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -77,10 +78,11 @@ import org.springframework.util.StringUtils;
|
||||
* @see org.springframework.data.gemfire.DataPolicyConverter
|
||||
* @see org.springframework.data.gemfire.RegionLookupFactoryBean
|
||||
* @see org.springframework.data.gemfire.config.annotation.RegionConfigurer
|
||||
* @see org.springframework.data.gemfire.support.SmartLifecycleSupport
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
public class ClientRegionFactoryBean<K, V> extends RegionLookupFactoryBean<K, V>
|
||||
implements EvictingRegionFactoryBean, ExpiringRegionFactoryBean<K, V>, DisposableBean {
|
||||
implements SmartLifecycleSupport, EvictingRegionFactoryBean, ExpiringRegionFactoryBean<K, V>, DisposableBean {
|
||||
|
||||
public static final String DEFAULT_POOL_NAME = "DEFAULT";
|
||||
public static final String GEMFIRE_POOL_NAME = GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME;
|
||||
@@ -462,8 +464,6 @@ public class ClientRegionFactoryBean<K, V> extends RegionLookupFactoryBean<K, V>
|
||||
|
||||
super.postProcess(region);
|
||||
|
||||
registerInterests(region);
|
||||
|
||||
Optional.ofNullable(this.cacheLoader)
|
||||
.ifPresent(cacheLoader -> region.getAttributesMutator().setCacheLoader(cacheLoader));
|
||||
|
||||
@@ -473,11 +473,21 @@ public class ClientRegionFactoryBean<K, V> extends RegionLookupFactoryBean<K, V>
|
||||
return region;
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
/**
|
||||
* Registers interests in the startup lifecycle phase of the Spring container.
|
||||
*
|
||||
* @see #getRegion()
|
||||
* @see #registerInterests(Region)
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
registerInterests(getRegion());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Region<K, V> registerInterests(Region<K, V> region) {
|
||||
|
||||
stream(nullSafeArray(this.interests, Interest.class)).forEach(interest -> {
|
||||
stream(nullSafeArray(getInterests(), Interest.class)).forEach(interest -> {
|
||||
|
||||
if (interest.isRegexType()) {
|
||||
region.registerInterestRegex((String) interest.getKey(), interest.getPolicy(),
|
||||
@@ -590,7 +600,7 @@ public class ClientRegionFactoryBean<K, V> extends RegionLookupFactoryBean<K, V>
|
||||
*/
|
||||
public void setClose(boolean close) {
|
||||
this.close = close;
|
||||
this.destroy = (this.destroy && !close); // retain previous value iff close is false.
|
||||
this.destroy = this.destroy && !close; // retain previous value iff close is false.
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -142,7 +142,6 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
this.initialized = true;
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private void applyContinuousQueryListenerContainerConfigurers() {
|
||||
applyContinuousQueryListenerContainerConfigurers(getCompositeContinuousQueryListenerContainerConfigurer());
|
||||
}
|
||||
@@ -252,7 +251,14 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
return getQueryService();
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
/**
|
||||
* Verifies the given {@link QueryService} is valid.
|
||||
*
|
||||
* @param queryService {@link QueryService} to validate.
|
||||
* @throws IllegalStateException if the {@link QueryService} is {@literal null}.
|
||||
* @return the given {@link QueryService}
|
||||
* @see org.apache.geode.cache.query.QueryService
|
||||
*/
|
||||
private QueryService validateQueryService(QueryService queryService) {
|
||||
|
||||
Assert.state(queryService != null, "QueryService is required");
|
||||
@@ -304,7 +310,6 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
initContinuousQueries(getContinuousQueryDefinitions());
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private void initContinuousQueries(Set<ContinuousQueryDefinition> continuousQueryDefinitions) {
|
||||
|
||||
// Stop the ContinuousQueryListenerContainer if currently running...
|
||||
@@ -609,13 +614,16 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
}
|
||||
|
||||
public boolean addContinuousQueryDefinition(ContinuousQueryDefinition definition) {
|
||||
return Optional.ofNullable(definition).map(it -> getContinuousQueryDefinitions().add(it)).orElse(false);
|
||||
|
||||
return Optional.ofNullable(definition)
|
||||
.map(getContinuousQueryDefinitions()::add)
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
CqQuery addContinuousQuery(ContinuousQueryDefinition definition) {
|
||||
|
||||
try {
|
||||
|
||||
CqAttributes attributes = definition.toCqAttributes(this::newCqListener);
|
||||
|
||||
CqQuery query = (definition.isNamed() ? newNamedContinuousQuery(definition, attributes)
|
||||
@@ -628,26 +636,22 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
protected CqListener newCqListener(ContinuousQueryListener listener) {
|
||||
return new EventDispatcherAdapter(listener);
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private CqQuery newNamedContinuousQuery(ContinuousQueryDefinition definition, CqAttributes attributes)
|
||||
throws QueryException {
|
||||
|
||||
return getQueryService().newCq(definition.getName(), definition.getQuery(), attributes, definition.isDurable());
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private CqQuery newUnnamedContinuousQuery(ContinuousQueryDefinition definition, CqAttributes attributes)
|
||||
throws CqException {
|
||||
|
||||
return getQueryService().newCq(definition.getQuery(), attributes, definition.isDurable());
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private CqQuery manage(CqQuery query) {
|
||||
getContinuousQueries().add(query);
|
||||
return query;
|
||||
@@ -667,12 +671,10 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
void doStart() {
|
||||
getContinuousQueries().forEach(this::execute);
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private void execute(CqQuery query) {
|
||||
|
||||
try {
|
||||
@@ -765,10 +767,10 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
void doStop() {
|
||||
|
||||
getContinuousQueries().forEach(query -> {
|
||||
|
||||
try {
|
||||
query.stop();
|
||||
}
|
||||
@@ -789,7 +791,6 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
this.initialized = false;
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private void closeQueries() {
|
||||
|
||||
getContinuousQueries().stream().filter(query -> !query.isClosed()).forEach(query -> {
|
||||
@@ -807,7 +808,6 @@ public class ContinuousQueryListenerContainer implements BeanFactoryAware, BeanN
|
||||
getContinuousQueries().clear();
|
||||
}
|
||||
|
||||
/* (non-Javadoc) */
|
||||
private void destroyExecutor() {
|
||||
|
||||
Optional.ofNullable(getTaskExecutor())
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.data.gemfire.support;
|
||||
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
|
||||
/**
|
||||
* The {@link SmartLifecycleSupport} interface is an extension of Spring's {@link SmartLifecycle} interface
|
||||
* providing default, convenient behavior for many of the lifecycle methods as well as a serving
|
||||
* as a {@link FunctionalInterface}.
|
||||
*
|
||||
* @author John Blum
|
||||
* @see java.lang.FunctionalInterface
|
||||
* @see org.springframework.context.SmartLifecycle
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@FunctionalInterface
|
||||
@SuppressWarnings("unused")
|
||||
public interface SmartLifecycleSupport extends SmartLifecycle {
|
||||
|
||||
boolean DEFAULT_AUTO_STARTUP = true;
|
||||
boolean DEFAULT_IS_RUNNING = false;
|
||||
|
||||
int DEFAULT_PHASE = 0;
|
||||
|
||||
@Override
|
||||
default boolean isAutoStartup() {
|
||||
return DEFAULT_AUTO_STARTUP;
|
||||
}
|
||||
|
||||
@Override
|
||||
default void stop(Runnable runnable) {
|
||||
stop();
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
default void stop() { }
|
||||
|
||||
@Override
|
||||
default boolean isRunning() {
|
||||
return DEFAULT_IS_RUNNING;
|
||||
}
|
||||
|
||||
@Override
|
||||
default int getPhase() {
|
||||
return DEFAULT_PHASE;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.data.gemfire.support;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link SmartLifecycleSupport}.
|
||||
*
|
||||
* @author John Blum
|
||||
* @see org.junit.Test
|
||||
* @see org.mockito.Mockito
|
||||
* @see org.springframework.data.gemfire.support.SmartLifecycleSupport
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public class SmartLifecycleSupportUnitTests {
|
||||
|
||||
@Test
|
||||
public void isAutoStartupReturnsTrueByDefault() {
|
||||
|
||||
SmartLifecycle smartLifecycle = mock(SmartLifecycleSupport.class);
|
||||
|
||||
when(smartLifecycle.isAutoStartup()).thenCallRealMethod();
|
||||
|
||||
assertThat(smartLifecycle.isAutoStartup()).isTrue();
|
||||
|
||||
verify(smartLifecycle, times(1)).isAutoStartup();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stopWithRunnableCallsStopCallsRunnableRun() {
|
||||
|
||||
Runnable mockRunnable = mock(Runnable.class);
|
||||
|
||||
SmartLifecycle smartLifecycle = mock(SmartLifecycleSupport.class);
|
||||
|
||||
doCallRealMethod().when(smartLifecycle).stop(any(Runnable.class));
|
||||
|
||||
smartLifecycle.stop(mockRunnable);
|
||||
|
||||
verify(smartLifecycle, times(1)).stop();
|
||||
verify(mockRunnable, times(1)).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isRunningReturnsFalse() {
|
||||
|
||||
SmartLifecycle smartLifecycle = mock(SmartLifecycleSupport.class);
|
||||
|
||||
when(smartLifecycle.isRunning()).thenCallRealMethod();
|
||||
|
||||
assertThat(smartLifecycle.isRunning()).isFalse();
|
||||
|
||||
verify(smartLifecycle, times(1)).isRunning();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getPhaseReturnsDefaultPhase() {
|
||||
|
||||
SmartLifecycle smartLifecycle = mock(SmartLifecycleSupport.class);
|
||||
|
||||
when(smartLifecycle.getPhase()).thenCallRealMethod();
|
||||
|
||||
assertThat(smartLifecycle.getPhase()).isEqualTo(SmartLifecycleSupport.DEFAULT_PHASE);
|
||||
|
||||
verify(smartLifecycle, times(1)).getPhase();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user