Allow configuration of RedisMessageListenerContainer through @EnableRedisRepositories.
We now support configuration of a bean reference to RedisMessageListenerContainer that should be used with `RedisKeyValueAdapter` for easier configuration of the listener container. Closes #1827
This commit is contained in:
@@ -113,6 +113,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
private RedisOperations<?, ?> redisOps;
|
||||
private RedisConverter converter;
|
||||
private @Nullable RedisMessageListenerContainer messageListenerContainer;
|
||||
private boolean managedListenerContainer = true;
|
||||
private final AtomicReference<KeyExpirationEventMessageListener> expirationListener = new AtomicReference<>(null);
|
||||
private @Nullable ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@@ -195,7 +196,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
|
||||
this.converter = redisConverter;
|
||||
this.redisOps = redisOps;
|
||||
initMessageListenerContainer();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -236,7 +236,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
|
||||
connection.hMSet(objectKey, rdo.getBucket().rawMap());
|
||||
|
||||
if(isNew) {
|
||||
if (isNew) {
|
||||
connection.sAdd(toBytes(rdo.getKeyspace()), key);
|
||||
}
|
||||
|
||||
@@ -351,7 +351,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
connection.sRem(binKeyspace, binId);
|
||||
new IndexWriter(connection, converter).removeKeyFromIndexes(asString(keyspace), binId);
|
||||
|
||||
if(RedisKeyValueAdapter.this.keepShadowCopy()) {
|
||||
if (RedisKeyValueAdapter.this.keepShadowCopy()) {
|
||||
|
||||
RedisPersistentEntity<?> persistentEntity = converter.getMappingContext().getPersistentEntity(type);
|
||||
if (persistentEntity != null && persistentEntity.isExpiring()) {
|
||||
@@ -524,7 +524,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
|
||||
connection.persist(redisKey);
|
||||
|
||||
if(keepShadowCopy()) {
|
||||
if (keepShadowCopy()) {
|
||||
connection.del(ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX));
|
||||
}
|
||||
}
|
||||
@@ -685,7 +685,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
|
||||
/**
|
||||
* @return {@literal true} if {@link RedisData#getTimeToLive()} has a positive value.
|
||||
*
|
||||
* @param data must not be {@literal null}.
|
||||
* @since 2.3.7
|
||||
*/
|
||||
@@ -703,6 +702,28 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
this.enableKeyspaceEvents = enableKeyspaceEvents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a {@link RedisMessageListenerContainer} to listen for Keyspace expiry events. The container can only be
|
||||
* set when this bean hasn't been yet {@link #afterPropertiesSet() initialized}.
|
||||
*
|
||||
* @param messageListenerContainer the container to use.
|
||||
* @since 2.6.6
|
||||
* @throws IllegalStateException when trying to set a {@link RedisMessageListenerContainer} after
|
||||
* {@link #afterPropertiesSet()} has been called to initialize a managed container instance.
|
||||
*/
|
||||
public void setMessageListenerContainer(RedisMessageListenerContainer messageListenerContainer) {
|
||||
|
||||
Assert.notNull(messageListenerContainer, "RedisMessageListenerContainer must not be null");
|
||||
|
||||
if (this.managedListenerContainer && this.messageListenerContainer != null) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot set RedisMessageListenerContainer after initializing a managed RedisMessageListenerContainer instance");
|
||||
}
|
||||
|
||||
this.managedListenerContainer = false;
|
||||
this.messageListenerContainer = messageListenerContainer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the {@literal notify-keyspace-events} property if not already set. Use an empty {@link String} or
|
||||
* {@literal null} to retain existing server settings.
|
||||
@@ -731,6 +752,10 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
if (this.managedListenerContainer) {
|
||||
initMessageListenerContainer();
|
||||
}
|
||||
|
||||
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
|
||||
initKeyExpirationListener();
|
||||
}
|
||||
@@ -746,8 +771,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
|
||||
this.expirationListener.get().destroy();
|
||||
}
|
||||
|
||||
if (this.messageListenerContainer != null) {
|
||||
if (this.managedListenerContainer && this.messageListenerContainer != null) {
|
||||
this.messageListenerContainer.destroy();
|
||||
this.messageListenerContainer = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -167,6 +167,16 @@ public @interface EnableRedisRepositories {
|
||||
*/
|
||||
EnableKeyspaceEvents enableKeyspaceEvents() default EnableKeyspaceEvents.OFF;
|
||||
|
||||
/**
|
||||
* Configure the name of the {@link org.springframework.data.redis.listener.RedisMessageListenerContainer} bean to be
|
||||
* used for keyspace event subscriptions. Defaults to use an anonymous managed instance by
|
||||
* {@link org.springframework.data.redis.core.RedisKeyValueAdapter}.
|
||||
*
|
||||
* @return
|
||||
* @since 2.6.6
|
||||
*/
|
||||
String messageListenerContainerRef() default "";
|
||||
|
||||
/**
|
||||
* Configuration flag controlling storage of phantom keys (shadow copies) of expiring entities to read them later when
|
||||
* publishing {@link org.springframework.data.redis.core.RedisKeyspaceEvent keyspace events}.
|
||||
|
||||
@@ -139,7 +139,7 @@ public class RedisRepositoryConfigurationExtension extends KeyValueRepositoryCon
|
||||
|
||||
private static AbstractBeanDefinition createRedisKeyValueAdapter(RepositoryConfigurationSource configuration) {
|
||||
|
||||
return BeanDefinitionBuilder.rootBeanDefinition(RedisKeyValueAdapter.class) //
|
||||
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(RedisKeyValueAdapter.class) //
|
||||
.addConstructorArgReference(configuration.getRequiredAttribute("redisTemplateRef", String.class)) //
|
||||
.addConstructorArgReference(REDIS_CONVERTER_BEAN_NAME) //
|
||||
.addPropertyValue("enableKeyspaceEvents",
|
||||
@@ -147,8 +147,12 @@ public class RedisRepositoryConfigurationExtension extends KeyValueRepositoryCon
|
||||
.addPropertyValue("keyspaceNotificationsConfigParameter",
|
||||
configuration.getAttribute("keyspaceNotificationsConfigParameter", String.class).orElse("")) //
|
||||
.addPropertyValue("shadowCopy",
|
||||
configuration.getRequiredAttribute("shadowCopy", ShadowCopy.class)) //
|
||||
.getBeanDefinition();
|
||||
configuration.getRequiredAttribute("shadowCopy", ShadowCopy.class));
|
||||
|
||||
configuration.getAttribute("messageListenerContainerRef")
|
||||
.ifPresent(it -> builder.addPropertyReference("messageListenerContainer", it));
|
||||
|
||||
return builder.getBeanDefinition();
|
||||
}
|
||||
|
||||
private static AbstractBeanDefinition createRedisReferenceResolverDefinition(String redisTemplateRef) {
|
||||
|
||||
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@@ -32,6 +33,7 @@ import org.springframework.data.redis.core.RedisHash;
|
||||
import org.springframework.data.redis.core.RedisKeyValueAdapter;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.convert.ReferenceResolver;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.repository.Repository;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
@@ -39,7 +41,10 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
/**
|
||||
* Unit tests for Redis Repository configuration.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
*/
|
||||
public class RedisRepositoryConfigurationUnitTests {
|
||||
|
||||
@@ -123,6 +128,39 @@ public class RedisRepositoryConfigurationUnitTests {
|
||||
}
|
||||
}
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@DirtiesContext
|
||||
@ContextConfiguration(classes = { WithMessageListenerConfigurationUnitTests.Config.class })
|
||||
public static class WithMessageListenerConfigurationUnitTests {
|
||||
|
||||
@EnableRedisRepositories(considerNestedRepositories = true,
|
||||
includeFilters = { @ComponentScan.Filter(type = FilterType.REGEX, pattern = { ".*ContextSampleRepository" }) },
|
||||
keyspaceNotificationsConfigParameter = "", messageListenerContainerRef = "myContainer")
|
||||
static class Config {
|
||||
|
||||
@Bean
|
||||
RedisMessageListenerContainer myContainer() {
|
||||
return mock(RedisMessageListenerContainer.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
RedisTemplate<?, ?> redisTemplate() {
|
||||
return createTemplateMock();
|
||||
}
|
||||
}
|
||||
|
||||
@Autowired ApplicationContext ctx;
|
||||
|
||||
@Test // DATAREDIS-425
|
||||
public void shouldConfigureMessageListenerContainer() {
|
||||
|
||||
RedisKeyValueAdapter adapter = ctx.getBean("redisKeyValueAdapter", RedisKeyValueAdapter.class);
|
||||
Object messageListenerContainer = ReflectionTestUtils.getField(adapter, "messageListenerContainer");
|
||||
|
||||
assertThat(Mockito.mockingDetails(messageListenerContainer).isMock()).isTrue();
|
||||
}
|
||||
}
|
||||
|
||||
@RedisHash
|
||||
static class Sample {
|
||||
String id;
|
||||
|
||||
Reference in New Issue
Block a user