Redis consumer

* Provide a reusable consumer bean.
* Options for redis key, queue or topic.
* Adding tests to verify.
* Addressing PR review comments
* Applying checkstyle changes
* Fixing build issues
* Move SpEL parsing to the `RedisConsumerProperties`
for cleaner code in the `RedisConsumerConfiguration`
* Remove non-existing `spring.redis.user` property from
the test `application.properties`
* Change `redis-consumer` module description to more official manner
This commit is contained in:
Soby Chacko
2020-05-06 13:18:43 -04:00
committed by Artem Bilan
parent 9d2a702228
commit 86fc242292
11 changed files with 567 additions and 2 deletions

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apps-build</artifactId>
<artifactId>stream-applications-build</artifactId>
<groupId>org.springframework.cloud.stream.app</groupId>
<version>Fahrenheit.BUILD-SNAPSHOT</version>
</parent>

View File

@@ -6,7 +6,7 @@
<description>Stream Apps Docs</description>
<parent>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>apps-build</artifactId>
<artifactId>stream-applications-build</artifactId>
<version>Fahrenheit.BUILD-SNAPSHOT</version>
</parent>

View File

@@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>redis-consumer</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<name>redis-consumer</name>
<description>Redis Consumer</description>
<parent>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-functions-parent</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<relativePath>../../spring-functions-parent</relativePath>
</parent>
<properties>
<spring-cloud-starters.version>2.2.0.RELEASE</spring-cloud-starters.version>
<embedded-redis.version>1.48</embedded-redis.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.playtika.testcontainers</groupId>
<artifactId>embedded-redis</artifactId>
<version>${embedded-redis.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter</artifactId>
<version>${spring-cloud-starters.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.function.Consumer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.outbound.RedisPublishingMessageHandler;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
import org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
/**
* @author Eric Bottard
* @author Mark Pollack
* @author Gary Russell
* @author Soby Chacko
* @author Artem Bilan
*/
@Configuration
@EnableConfigurationProperties(RedisConsumerProperties.class)
public class RedisConsumerConfiguration {
@Bean
public Consumer<Message<?>> redisConsumer() {
return redisConsumerMessageHandler(null, null)::handleMessage;
}
@Bean
public MessageHandler redisConsumerMessageHandler(RedisConnectionFactory redisConnectionFactory,
RedisConsumerProperties redisConsumerProperties) {
if (redisConsumerProperties.isKeyPresent()) {
RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new RedisStoreWritingMessageHandler(
redisConnectionFactory);
redisStoreWritingMessageHandler.setKeyExpression(redisConsumerProperties.keyExpression());
return redisStoreWritingMessageHandler;
}
else if (redisConsumerProperties.isQueuePresent()) {
return new RedisQueueOutboundChannelAdapter(redisConsumerProperties.queueExpression(),
redisConnectionFactory);
}
else { // must be topic
RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(
redisConnectionFactory);
redisPublishingMessageHandler.setTopicExpression(redisConsumerProperties.topicExpression());
return redisPublishingMessageHandler;
}
}
}

View File

@@ -0,0 +1,156 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.Arrays;
import java.util.Collections;
import javax.validation.constraints.AssertTrue;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
/**
* Used to configure those Redis Sink module options that are not related to connecting to Redis.
*
* @author Eric Bottard
* @author Mark Pollack
* @author Artem Bilan
* @author Soby Chacko
*/
@ConfigurationProperties("redis.consumer")
@Validated
public class RedisConsumerProperties {
private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
/**
* A SpEL expression to use for topic.
*/
private String topicExpression;
/**
* A SpEL expression to use for queue.
*/
private String queueExpression;
/**
* A SpEL expression to use for storing to a key.
*/
private String keyExpression;
/**
* A literal key name to use when storing to a key.
*/
private String key;
/**
* A literal queue name to use when storing in a queue.
*/
private String queue;
/**
* A literal topic name to use when publishing to a topic.
*/
private String topic;
public Expression keyExpression() {
return key != null ? new LiteralExpression(key) : EXPRESSION_PARSER.parseExpression(keyExpression);
}
public Expression queueExpression() {
return queue != null ? new LiteralExpression(queue) : EXPRESSION_PARSER.parseExpression(queueExpression);
}
public Expression topicExpression() {
return topic != null ? new LiteralExpression(topic) : EXPRESSION_PARSER.parseExpression(topicExpression);
}
boolean isKeyPresent() {
return StringUtils.hasText(key) || keyExpression != null;
}
boolean isQueuePresent() {
return StringUtils.hasText(queue) || queueExpression != null;
}
boolean isTopicPresent() {
return StringUtils.hasText(topic) || topicExpression != null;
}
public String getTopicExpression() {
return topicExpression;
}
public void setTopicExpression(String topicExpression) {
this.topicExpression = topicExpression;
}
public String getQueueExpression() {
return queueExpression;
}
public void setQueueExpression(String queueExpression) {
this.queueExpression = queueExpression;
}
public String getKeyExpression() {
return keyExpression;
}
public void setKeyExpression(String keyExpression) {
this.keyExpression = keyExpression;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
// The javabean property name is what will be reported in case of violation. Make it meaningful
@AssertTrue(message = "Exactly one of 'queue', 'queueExpression', 'key', 'keyExpression', "
+ "'topic' and 'topicExpression' must be set")
public boolean isMutuallyExclusive() {
Object[] props = new Object[] { queue, queueExpression, key, keyExpression, topic, topicExpression };
return (props.length - 1) == Collections.frequency(Arrays.asList(props), null);
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
/**
* @author Soby Chacko
*/
@SpringBootTest
@DirtiesContext
public class AbstractRedisConsumerTests {
@Autowired
Consumer<Message<?>> redisConsumer;
@Autowired
RedisConnectionFactory redisConnectionFactory;
@Autowired
StringRedisTemplate redisTemplate;
@SpringBootApplication
static class TestApplication {
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.support.collections.DefaultRedisList;
import org.springframework.data.redis.support.collections.RedisList;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Mark Pollack
* @author Marius Bogoevici
* @author Gary Russell
*/
@TestPropertySource(properties = "redis.consumer.key = foo")
public class RedisConsumerKeyTests extends AbstractRedisConsumerTests {
@Test
public void testWithKey() {
//Setup
String key = "foo";
redisTemplate.delete(key);
RedisList<String> redisList = new DefaultRedisList<>(key, redisTemplate);
List<String> list = new ArrayList<>();
list.add("Manny");
list.add("Moe");
list.add("Jack");
//Execute
Message<List<String>> message = new GenericMessage<>(list);
redisConsumer.accept(message);
//Assert
assertThat(redisList.size()).isEqualTo(3);
assertThat(redisList.get(0)).isEqualTo("Manny");
assertThat(redisList.get(1)).isEqualTo("Moe");
assertThat(redisList.get(2)).isEqualTo("Jack");
//Cleanup
redisTemplate.delete(key);
}
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
@TestPropertySource(properties = "redis.consumer.queue = test-queue")
public class RedisConsumerQueueTests extends AbstractRedisConsumerTests {
@Test
public void testWithQueue() {
Message<String> message = MessageBuilder.withPayload("hello").build();
redisConsumer.accept(message);
Object result = redisTemplate.boundListOps("test-queue").rightPop(5000, TimeUnit.MILLISECONDS);
assertThat(result).isEqualTo("hello");
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.consumer.redis;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
@TestPropertySource(properties = "redis.consumer.topic = foo-topic")
public class RedisConsumerTopicTests extends AbstractRedisConsumerTests {
@Autowired
RedisConnectionFactory connectionFactory;
@Test
public void testWithTopic() throws Exception {
int numToTest = 10;
String topic = "foo-topic";
final CountDownLatch latch = new CountDownLatch(numToTest);
MessageListenerAdapter listener = new MessageListenerAdapter();
listener.setDelegate(new Listener(latch));
listener.setSerializer(new StringRedisSerializer());
listener.afterPropertiesSet();
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.afterPropertiesSet();
container.addMessageListener(listener, Collections.<Topic>singletonList(new ChannelTopic(topic)));
container.start();
Awaitility.await().until(() -> TestUtils.getPropertyValue(container, "subscriptionTask.connection",
RedisConnection.class) != null);
Message<String> message = MessageBuilder.withPayload("hello").build();
for (int i = 0; i < numToTest; i++) {
redisConsumer.accept(message);
}
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
}
private static class Listener {
private final CountDownLatch latch;
Listener(CountDownLatch latch) {
this.latch = latch;
}
@SuppressWarnings("unused")
public void handleMessage(String s) {
this.latch.countDown();
}
}
}

View File

@@ -0,0 +1,3 @@
spring.redis.host=${embedded.redis.host}
spring.redis.port=${embedded.redis.port}
spring.redis.password=${embedded.redis.password}

View File

@@ -47,6 +47,7 @@
<module>consumer/log-consumer</module>
<module>consumer/mongodb-consumer</module>
<module>consumer/rabbit-consumer</module>
<module>consumer/redis-consumer</module>
<module>function/filter-function</module>
<module>function/spel-function</module>