diff --git a/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java b/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java index 9478d47a9..e698b701e 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java +++ b/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java @@ -113,6 +113,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter private RedisOperations redisOps; private RedisConverter converter; private @Nullable RedisMessageListenerContainer messageListenerContainer; + private boolean managedListenerContainer = true; private final AtomicReference expirationListener = new AtomicReference<>(null); private @Nullable ApplicationEventPublisher eventPublisher; @@ -195,7 +196,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter this.converter = redisConverter; this.redisOps = redisOps; - initMessageListenerContainer(); } /** @@ -236,7 +236,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter connection.hMSet(objectKey, rdo.getBucket().rawMap()); - if(isNew) { + if (isNew) { connection.sAdd(toBytes(rdo.getKeyspace()), key); } @@ -351,7 +351,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter connection.sRem(binKeyspace, binId); new IndexWriter(connection, converter).removeKeyFromIndexes(asString(keyspace), binId); - if(RedisKeyValueAdapter.this.keepShadowCopy()) { + if (RedisKeyValueAdapter.this.keepShadowCopy()) { RedisPersistentEntity persistentEntity = converter.getMappingContext().getPersistentEntity(type); if (persistentEntity != null && persistentEntity.isExpiring()) { @@ -524,7 +524,7 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter connection.persist(redisKey); - if(keepShadowCopy()) { + if (keepShadowCopy()) { connection.del(ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX)); } } @@ -685,7 +685,6 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter /** * @return {@literal true} if {@link RedisData#getTimeToLive()} has a positive value. - * * @param data must not be {@literal null}. * @since 2.3.7 */ @@ -703,6 +702,28 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter this.enableKeyspaceEvents = enableKeyspaceEvents; } + /** + * Configure a {@link RedisMessageListenerContainer} to listen for Keyspace expiry events. The container can only be + * set when this bean hasn't been yet {@link #afterPropertiesSet() initialized}. + * + * @param messageListenerContainer the container to use. + * @since 2.6.6 + * @throws IllegalStateException when trying to set a {@link RedisMessageListenerContainer} after + * {@link #afterPropertiesSet()} has been called to initialize a managed container instance. + */ + public void setMessageListenerContainer(RedisMessageListenerContainer messageListenerContainer) { + + Assert.notNull(messageListenerContainer, "RedisMessageListenerContainer must not be null"); + + if (this.managedListenerContainer && this.messageListenerContainer != null) { + throw new IllegalStateException( + "Cannot set RedisMessageListenerContainer after initializing a managed RedisMessageListenerContainer instance"); + } + + this.managedListenerContainer = false; + this.messageListenerContainer = messageListenerContainer; + } + /** * Configure the {@literal notify-keyspace-events} property if not already set. Use an empty {@link String} or * {@literal null} to retain existing server settings. @@ -731,6 +752,10 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter @Override public void afterPropertiesSet() { + if (this.managedListenerContainer) { + initMessageListenerContainer(); + } + if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) { initKeyExpirationListener(); } @@ -746,8 +771,9 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter this.expirationListener.get().destroy(); } - if (this.messageListenerContainer != null) { + if (this.managedListenerContainer && this.messageListenerContainer != null) { this.messageListenerContainer.destroy(); + this.messageListenerContainer = null; } } diff --git a/src/main/java/org/springframework/data/redis/repository/configuration/EnableRedisRepositories.java b/src/main/java/org/springframework/data/redis/repository/configuration/EnableRedisRepositories.java index dbaf0078c..873444c93 100644 --- a/src/main/java/org/springframework/data/redis/repository/configuration/EnableRedisRepositories.java +++ b/src/main/java/org/springframework/data/redis/repository/configuration/EnableRedisRepositories.java @@ -167,6 +167,16 @@ public @interface EnableRedisRepositories { */ EnableKeyspaceEvents enableKeyspaceEvents() default EnableKeyspaceEvents.OFF; + /** + * Configure the name of the {@link org.springframework.data.redis.listener.RedisMessageListenerContainer} bean to be + * used for keyspace event subscriptions. Defaults to use an anonymous managed instance by + * {@link org.springframework.data.redis.core.RedisKeyValueAdapter}. + * + * @return + * @since 2.6.6 + */ + String messageListenerContainerRef() default ""; + /** * Configuration flag controlling storage of phantom keys (shadow copies) of expiring entities to read them later when * publishing {@link org.springframework.data.redis.core.RedisKeyspaceEvent keyspace events}. diff --git a/src/main/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationExtension.java b/src/main/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationExtension.java index 9998d5106..b5d9c821f 100644 --- a/src/main/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationExtension.java +++ b/src/main/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationExtension.java @@ -139,7 +139,7 @@ public class RedisRepositoryConfigurationExtension extends KeyValueRepositoryCon private static AbstractBeanDefinition createRedisKeyValueAdapter(RepositoryConfigurationSource configuration) { - return BeanDefinitionBuilder.rootBeanDefinition(RedisKeyValueAdapter.class) // + BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(RedisKeyValueAdapter.class) // .addConstructorArgReference(configuration.getRequiredAttribute("redisTemplateRef", String.class)) // .addConstructorArgReference(REDIS_CONVERTER_BEAN_NAME) // .addPropertyValue("enableKeyspaceEvents", @@ -147,8 +147,12 @@ public class RedisRepositoryConfigurationExtension extends KeyValueRepositoryCon .addPropertyValue("keyspaceNotificationsConfigParameter", configuration.getAttribute("keyspaceNotificationsConfigParameter", String.class).orElse("")) // .addPropertyValue("shadowCopy", - configuration.getRequiredAttribute("shadowCopy", ShadowCopy.class)) // - .getBeanDefinition(); + configuration.getRequiredAttribute("shadowCopy", ShadowCopy.class)); + + configuration.getAttribute("messageListenerContainerRef") + .ifPresent(it -> builder.addPropertyReference("messageListenerContainer", it)); + + return builder.getBeanDefinition(); } private static AbstractBeanDefinition createRedisReferenceResolverDefinition(String redisTemplateRef) { diff --git a/src/test/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationUnitTests.java b/src/test/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationUnitTests.java index b3d394da5..9e03191b3 100644 --- a/src/test/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationUnitTests.java +++ b/src/test/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationUnitTests.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -32,6 +33,7 @@ import org.springframework.data.redis.core.RedisHash; import org.springframework.data.redis.core.RedisKeyValueAdapter; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.convert.ReferenceResolver; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.repository.Repository; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; @@ -39,7 +41,10 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.util.ReflectionTestUtils; /** + * Unit tests for Redis Repository configuration. + * * @author Christoph Strobl + * @author Mark Paluch */ public class RedisRepositoryConfigurationUnitTests { @@ -123,6 +128,39 @@ public class RedisRepositoryConfigurationUnitTests { } } + @ExtendWith(SpringExtension.class) + @DirtiesContext + @ContextConfiguration(classes = { WithMessageListenerConfigurationUnitTests.Config.class }) + public static class WithMessageListenerConfigurationUnitTests { + + @EnableRedisRepositories(considerNestedRepositories = true, + includeFilters = { @ComponentScan.Filter(type = FilterType.REGEX, pattern = { ".*ContextSampleRepository" }) }, + keyspaceNotificationsConfigParameter = "", messageListenerContainerRef = "myContainer") + static class Config { + + @Bean + RedisMessageListenerContainer myContainer() { + return mock(RedisMessageListenerContainer.class); + } + + @Bean + RedisTemplate redisTemplate() { + return createTemplateMock(); + } + } + + @Autowired ApplicationContext ctx; + + @Test // DATAREDIS-425 + public void shouldConfigureMessageListenerContainer() { + + RedisKeyValueAdapter adapter = ctx.getBean("redisKeyValueAdapter", RedisKeyValueAdapter.class); + Object messageListenerContainer = ReflectionTestUtils.getField(adapter, "messageListenerContainer"); + + assertThat(Mockito.mockingDetails(messageListenerContainer).isMock()).isTrue(); + } + } + @RedisHash static class Sample { String id;