diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
index 65b0722..4a60fd5 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
@@ -20,10 +20,6 @@
com.couchbase.client
java-client
-
- org.springframework.cloud
- spring-cloud-function-context
-
org.springframework.boot
spring-boot-configuration-processor
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
index c70c55c..4a5d0ec 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
@@ -16,10 +16,8 @@
package io.spring.example.couchbase.consumer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -28,8 +26,7 @@ import javax.annotation.PreDestroy;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.kv.MutationResult;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.junit.jupiter.api.BeforeAll;
+import io.spring.example.couchbase.consumer.domain.User;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.couchbase.BucketDefinition;
@@ -40,10 +37,8 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.builder.SpringApplicationBuilder;
-import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
@@ -51,182 +46,177 @@ import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-@Testcontainers
-public class CouchbaseConsumerTests {
- @Container
- static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
- .withBucket(new BucketDefinition("test"));
- static Map connectProperties = new HashMap<>();
-
- @BeforeAll
- static void initialize() {
- connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
- connectProperties.put("spring.couchbase.username", container.getUsername());
- connectProperties.put("spring.couchbase.password", container.getPassword());
- }
-
- private SpringApplicationBuilder applicationBuilder;
+ private ApplicationContextRunner applicationContextRunner;
@BeforeEach
void setup() {
- applicationBuilder = new SpringApplicationBuilder(TestConfig.class).web(WebApplicationType.NONE)
- .properties(connectProperties);
+ applicationContextRunner = new ApplicationContextRunner()
+ .withUserConfiguration(TestConfig.class)
+ .withPropertyValues(
+ "spring.couchbase.connection-string=" + container.getConnectionString(),
+ "spring.couchbase.username=" + container.getUsername(),
+ "spring.couchbase.password=" + container.getPassword());
}
@Test
void keyExpressionRequired() {
assertThatExceptionOfType(RuntimeException.class).isThrownBy(
- () -> applicationBuilder.run("--couchbase.consumer.bucket-expression='test'")) // faster
+ () -> applicationContextRunner.withPropertyValues("couchbase.consumer.bucket-expression='test'")
+ .run(context -> context.start()))
.havingRootCause()
.withMessageContaining("'keyExpression' is required");
}
@Test
void singleUpsert() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "'test'",
- "couchbase.consumer.keyExpression", "payload.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Cluster cluster = context.getBean(Cluster.class);
- String bucketName = properties.getBucketExpression().getValue(String.class);
- Function>, Flux> couchbaseConsumerFunction = context
- .getBean("couchbaseConsumerFunction", Function.class);
- StepVerifier.create(couchbaseConsumerFunction
- .apply(Flux.just(new GenericMessage<>(new User("David", "david@david.com")))))
- .expectNextMatches(mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .verifyComplete();
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression='test'",
+ "couchbase.consumer.keyExpression=payload.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Cluster cluster = context.getBean(Cluster.class);
+ String bucketName = properties.getBucketExpression().getValue(String.class);
+ Function>, Flux> couchbaseConsumerFunction = context
+ .getBean("couchbaseConsumerFunction", Function.class);
+ StepVerifier.create(couchbaseConsumerFunction
+ .apply(Flux.just(new GenericMessage<>(new User("David", "david@david.com")))))
+ .expectNextMatches(
+ mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
+ bucketName))
+ .verifyComplete();
- User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com").contentAs(User.class);
- assertThat(saved.getName()).isEqualTo("David");
- }
+ User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com")
+ .contentAs(User.class);
+ assertThat(saved.getName()).isEqualTo("David");
+ });
}
@Test
void singleUpsertConsumer() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "'test'",
- "couchbase.consumer.keyExpression", "payload.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Cluster cluster = context.getBean(Cluster.class);
- String bucketName = properties.getBucketExpression().getValue(String.class);
- Consumer>> couchbaseConsumer = context
- .getBean("couchbaseConsumer", Consumer.class);
- couchbaseConsumer.accept(Flux.just(new GenericMessage<>(new User("David", "david@david.com"))));
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression='test'",
+ "couchbase.consumer.keyExpression=payload.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Cluster cluster = context.getBean(Cluster.class);
+ String bucketName = properties.getBucketExpression().getValue(String.class);
+ Consumer>> couchbaseConsumer = context
+ .getBean("couchbaseConsumer", Consumer.class);
+ couchbaseConsumer.accept(Flux.just(new GenericMessage<>(new User("David", "david@david.com"))));
- User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com").contentAs(User.class);
- assertThat(saved.getName()).isEqualTo("David");
- }
+ User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com")
+ .contentAs(User.class);
+ assertThat(saved.getName()).isEqualTo("David");
+ });
}
-
@Test
void multipleUpsert() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "'test'",
- "couchbase.consumer.keyExpression", "payload.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Cluster cluster = context.getBean(Cluster.class);
- String bucketName = properties.getBucketExpression().getValue(String.class);
- User user1 = new User("David", "david@david.com");
- User user2 = new User("Nanette", "nanette@nanette.com");
- User user3 = new User("Soby", "soby@soby.com");
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression='test'",
+ "couchbase.consumer.keyExpression=payload.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Cluster cluster = context.getBean(Cluster.class);
+ String bucketName = properties.getBucketExpression().getValue(String.class);
+ User user1 = new User("David", "david@david.com");
+ User user2 = new User("Nanette", "nanette@nanette.com");
+ User user3 = new User("Soby", "soby@soby.com");
- Function>, Flux> couchbaseConsumerFunction = context
- .getBean("couchbaseConsumerFunction", Function.class);
- StepVerifier.create(couchbaseConsumerFunction
- .apply(Flux.just(
- new GenericMessage<>(user1),
- new GenericMessage<>(user2),
- new GenericMessage<>(user3))))
- .expectNextMatches(mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .expectNextMatches(mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .expectNextMatches(mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .verifyComplete();
+ Function>, Flux> couchbaseConsumerFunction = context
+ .getBean("couchbaseConsumerFunction", Function.class);
+ StepVerifier.create(couchbaseConsumerFunction
+ .apply(Flux.just(
+ new GenericMessage<>(user1),
+ new GenericMessage<>(user2),
+ new GenericMessage<>(user3))))
+ .expectNextMatches(
+ mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
+ bucketName))
+ .expectNextMatches(
+ mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
+ bucketName))
+ .expectNextMatches(
+ mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
+ bucketName))
+ .verifyComplete();
- List users = cluster.query("SELECT name,email from test").rowsAs(User.class);
- assertThat(users).containsExactlyInAnyOrder(user1, user2, user3);
- }
+ List users = cluster.query("SELECT name,email from test").rowsAs(User.class);
+ assertThat(users).containsExactlyInAnyOrder(user1, user2, user3);
+ });
}
@Test
void customBucketExpression() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "headers.bucketName",
- "couchbase.consumer.keyExpression", "payload.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Cluster cluster = context.getBean(Cluster.class);
- String bucketName = "test";
- MessageBuilder.withPayload(new User("David", "david@david.com"))
- .copyHeaders(Map.of("bucketName", bucketName)).build();
- Function>, Flux> couchbaseConsumerFunction = context
- .getBean("couchbaseConsumerFunction", Function.class);
- Message> message = MessageBuilder.withPayload(new User("David", "david@david.com"))
- .copyHeaders(Map.of("bucketName", bucketName)).build();
- StepVerifier.create(couchbaseConsumerFunction.apply(Flux.just(message)))
- .expectNextMatches(
- (MutationResult mutationResult) -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .verifyComplete();
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression=headers.bucketName",
+ "couchbase.consumer.keyExpression=payload.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Cluster cluster = context.getBean(Cluster.class);
+ String bucketName = "test";
+ MessageBuilder.withPayload(new User("David", "david@david.com"))
+ .copyHeaders(Map.of("bucketName", bucketName)).build();
+ Function>, Flux> couchbaseConsumerFunction = context
+ .getBean("couchbaseConsumerFunction", Function.class);
+ Message> message = MessageBuilder.withPayload(new User("David", "david@david.com"))
+ .copyHeaders(Map.of("bucketName", bucketName)).build();
+ StepVerifier.create(couchbaseConsumerFunction.apply(Flux.just(message)))
+ .expectNextMatches(
+ (MutationResult mutationResult) -> mutationResult.mutationToken().get().bucketName()
+ .equals(
+ bucketName))
+ .verifyComplete();
- User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com").contentAs(User.class);
- assertThat(saved.getName()).isEqualTo("David");
- }
+ User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com")
+ .contentAs(User.class);
+ assertThat(saved.getName()).isEqualTo("David");
+ });
}
@Test
void customValueExpression() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "'test'",
- "couchbase.consumer.valueExpression", "payload.user",
- "couchbase.consumer.keyExpression", "payload.user.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Cluster cluster = context.getBean(Cluster.class);
- String bucketName = properties.getBucketExpression().getValue(String.class);
- Function>, Flux> couchbaseConsumerFunction = context
- .getBean("couchbaseConsumerFunction", Function.class);
- StepVerifier.create(couchbaseConsumerFunction
- .apply(Flux.just(new GenericMessage<>(Map.of("user", new User("David", "david@david.com"))))))
- .expectNextMatches(mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
- bucketName))
- .verifyComplete();
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression='test'",
+ "couchbase.consumer.valueExpression=payload.user",
+ "couchbase.consumer.keyExpression=payload.user.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Cluster cluster = context.getBean(Cluster.class);
+ String bucketName = properties.getBucketExpression().getValue(String.class);
+ Function>, Flux> couchbaseConsumerFunction = context
+ .getBean("couchbaseConsumerFunction", Function.class);
+ StepVerifier.create(couchbaseConsumerFunction
+ .apply(Flux
+ .just(new GenericMessage<>(Map.of("user", new User("David", "david@david.com"))))))
+ .expectNextMatches(
+ mutationResult -> mutationResult.mutationToken().get().bucketName().equals(
+ bucketName))
+ .verifyComplete();
- Bucket bucket = cluster.bucket(bucketName);
- User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com").contentAs(User.class);
- assertThat(saved.getName()).isEqualTo("David");
- }
+ Bucket bucket = cluster.bucket(bucketName);
+ User saved = cluster.bucket(bucketName).defaultCollection().get("david@david.com")
+ .contentAs(User.class);
+ assertThat(saved.getName()).isEqualTo("David");
+ });
}
@Test
void bucketDoesNotExistShouldThrowException() {
- try (ConfigurableApplicationContext context = applicationBuilder
- .properties(Map.of(
- "couchbase.consumer.bucketExpression", "'users'",
- "couchbase.consumer.keyExpression", "payload.email"))
- .run()) {
- CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
- Function>, Flux> couchbaseConsumerFunction = context
- .getBean("couchbaseConsumerFunction", Function.class);
- StepVerifier.create(couchbaseConsumerFunction
- .apply(Flux.just(new GenericMessage<>(new User("David", "david@david.com")))))
- .expectErrorMatches(e -> e.toString().contains("BUCKET_NOT_AVAILABLE"))
- .verify();
- }
+ applicationContextRunner.withPropertyValues(
+ "couchbase.consumer.bucketExpression='users'",
+ "couchbase.consumer.keyExpression=payload.email")
+ .run(context -> {
+ CouchbaseConsumerProperties properties = context.getBean(CouchbaseConsumerProperties.class);
+ Function>, Flux> couchbaseConsumerFunction = context
+ .getBean("couchbaseConsumerFunction", Function.class);
+ StepVerifier.create(couchbaseConsumerFunction
+ .apply(Flux.just(new GenericMessage<>(new User("David", "david@david.com")))))
+ .expectErrorMatches(e -> e.toString().contains("BUCKET_NOT_AVAILABLE"))
+ .verify();
+ });
}
@SpringBootApplication
@@ -239,54 +229,4 @@ public class CouchbaseConsumerTests {
cluster.disconnect();
}
}
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- static class User {
- private String name;
-
- private String email;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getEmail() {
- return email;
- }
-
- public void setEmail(String email) {
- this.email = email;
- }
-
- public User() {
- }
-
- User(String name, String email) {
- this.name = name;
- this.email = email;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- User user = (User) o;
- return Objects.equals(name, user.name) &&
- Objects.equals(email, user.email);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, email);
- }
- }
-
}
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/domain/User.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/domain/User.java
new file mode 100644
index 0000000..bc86c55
--- /dev/null
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/domain/User.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.spring.example.couchbase.consumer.domain;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User {
+ private String name;
+
+ private String email;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public User() {
+ }
+
+ public User(String name, String email) {
+ this.name = name;
+ this.email = email;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ User user = (User) o;
+ return Objects.equals(name, user.name) &&
+ Objects.equals(email, user.email);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, email);
+ }
+}
\ No newline at end of file
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
index 845bf52..54cc9d6 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
@@ -14,10 +14,6 @@
couchbase-sink
Demo Couchbase Sink
-
- !integration
-
-
io.spring.example