diff --git a/cassandra/reactive/pom.xml b/cassandra/reactive/pom.xml
index dc86fae7..5816705d 100644
--- a/cassandra/reactive/pom.xml
+++ b/cassandra/reactive/pom.xml
@@ -28,6 +28,12 @@
rxjava-reactive-streams
+
+ io.projectreactor
+ reactor-test
+ test
+
+
${project.groupId}
spring-data-cassandra-example-utils
diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java
index 66baaddb..53986d01 100644
--- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java
+++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java
@@ -20,10 +20,10 @@ import static org.assertj.core.api.Assertions.*;
import example.springdata.cassandra.util.CassandraKeyspace;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import rx.RxReactiveStreams;
-import java.util.concurrent.CountDownLatch;
-
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -52,14 +52,14 @@ public class ReactiveCassandraTemplateIntegrationTest {
@Before
public void setUp() {
- template.truncate(Person.class) //
+ Flux truncateAndInsert = template.truncate(Person.class) //
.thenMany(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))) //
- .flatMap(template::insert) //
- .then() //
- .block();
+ .flatMap(template::insert);
+
+ StepVerifier.create(truncateAndInsert).expectNextCount(4).verifyComplete();
}
/**
@@ -67,22 +67,18 @@ public class ReactiveCassandraTemplateIntegrationTest {
* the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
+ public void shouldInsertAndCountData() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- template.count(Person.class) //
+ Mono saveAndCount = template.count(Person.class) //
.doOnNext(System.out::println) //
.thenMany(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))
.flatMap(template::insert) //
.last() //
.flatMap(v -> template.count(Person.class)) //
- .doOnNext(System.out::println) //
- .doOnTerminate(countDownLatch::countDown) //
- .subscribe();
+ .doOnNext(System.out::println);
- countDownLatch.await();
+ StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}
/**
diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java
index 2c962f1d..5fa46420 100644
--- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java
+++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java
@@ -15,14 +15,10 @@
*/
package example.springdata.cassandra.people;
-import static org.assertj.core.api.Assertions.*;
-
import example.springdata.cassandra.util.CassandraKeyspace;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import reactor.test.StepVerifier;
import org.junit.Before;
import org.junit.ClassRule;
@@ -51,34 +47,30 @@ public class ReactivePersonRepositoryIntegrationTest {
@Before
public void setUp() {
- repository.deleteAll() //
+ Flux deleteAndInsert = repository.deleteAll() //
.thenMany(repository.saveAll(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
- new Person("Jesse", "Pinkman", 27))))
- .then() //
- .block();
+ new Person("Jesse", "Pinkman", 27))));
+
+ StepVerifier.create(deleteAndInsert).expectNextCount(4).verifyComplete();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
+ public void shouldInsertAndCountData() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- repository.count() //
+ Mono saveAndCount = repository.count() //
.doOnNext(System.out::println) //
.thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> repository.count()) //
- .doOnNext(System.out::println) //
- .doOnTerminate(countDownLatch::countDown) //
- .subscribe();
+ .doOnNext(System.out::println);
- countDownLatch.await();
+ StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}
/**
@@ -86,17 +78,11 @@ public class ReactivePersonRepositoryIntegrationTest {
* prefetch define the amount of fetched records.
*/
@Test
- public void shouldPerformConversionBeforeResultProcessing() throws Exception {
+ public void shouldPerformConversionBeforeResultProcessing() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- repository.findAll() //
- .doOnNext(System.out::println) //
- .doOnComplete(countDownLatch::countDown) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
-
- countDownLatch.await();
+ StepVerifier.create(repository.findAll().doOnNext(System.out::println)) //
+ .expectNextCount(4) //
+ .verifyComplete();
}
/**
@@ -104,12 +90,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
-
- List whites = repository.findByLastname("White") //
- .collectList() //
- .block();
-
- assertThat(whites).hasSize(2);
+ StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete();
}
/**
@@ -117,11 +98,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithStringQuery() {
-
- Person heisenberg = repository.findByFirstnameInAndLastname("Walter", "White") //
- .block();
-
- assertThat(heisenberg).isNotNull();
+ StepVerifier.create(repository.findByFirstnameInAndLastname("Walter", "White")).expectNextCount(1).verifyComplete();
}
/**
@@ -129,12 +106,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
-
- List whites = repository.findByLastname(Mono.just("White")) //
- .collectList() //
- .block();
-
- assertThat(whites).hasSize(2);
+ StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete();
}
/**
@@ -143,10 +115,9 @@ public class ReactivePersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
- Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
- .block();
-
- assertThat(heisenberg).isNotNull();
+ StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) //
+ .expectNextCount(1) //
+ .verifyComplete();
}
}
diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java
index 612f09ec..2ec9ab63 100644
--- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java
+++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java
@@ -15,16 +15,11 @@
*/
package example.springdata.cassandra.people;
-import static org.assertj.core.api.Assertions.*;
-
import example.springdata.cassandra.util.CassandraKeyspace;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -60,16 +55,16 @@ public class RxJava2PersonRepositoryIntegrationTest {
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27)));
- deleteAll.andThen(save).blockingLast();
+ deleteAll.andThen(save).test().await().assertNoErrors();
}
/**
- * This sample performs a count, inserts data and performs a count again using reactive operator chaining.
+ * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
+ * the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
+ public void shouldInsertAndCountData() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
repository.count() //
.doOnSuccess(System.out::println) //
@@ -80,10 +75,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
.toSingle() //
.flatMap(v -> repository.count()) //
.doOnSuccess(System.out::println) //
- .doAfterTerminate(countDownLatch::countDown) //
- .subscribe();
-
- countDownLatch.await();
+ .test() //
+ .awaitCount(1) //
+ .assertValue(6L) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -91,17 +87,14 @@ public class RxJava2PersonRepositoryIntegrationTest {
* prefetch define the amount of fetched records.
*/
@Test
- public void shouldPerformConversionBeforeResultProcessing() throws Exception {
-
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ public void shouldPerformConversionBeforeResultProcessing() {
repository.findAll() //
.doOnNext(System.out::println) //
- .doOnEach(it -> countDownLatch.countDown()) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
-
- countDownLatch.await();
+ .test() //
+ .awaitCount(4) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -110,11 +103,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithQueryDerivation() {
- List whites = repository.findByLastname("White") //
- .toList() //
- .blockingGet();
-
- assertThat(whites).hasSize(2);
+ repository.findByLastname("White") //
+ .test() //
+ .awaitCount(2) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -123,10 +116,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithStringQuery() {
- Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
- .blockingGet();
-
- assertThat(heisenberg).isNotNull();
+ repository.findByFirstnameAndLastname("Walter", "White") //
+ .test() //
+ .awaitCount(1) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -135,11 +129,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
- List whites = repository.findByLastname(Single.just("White")) //
- .toList() //
- .blockingGet();
-
- assertThat(whites).hasSize(2);
+ repository.findByLastname(Single.just("White")) //
+ .test() //
+ .awaitCount(2) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -148,9 +142,10 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
- Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
- .blockingGet();
-
- assertThat(heisenberg).isNotNull();
+ repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
+ .test() //
+ .awaitCount(1) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
}
diff --git a/mongodb/reactive/pom.xml b/mongodb/reactive/pom.xml
index d0777f54..d00ca95b 100644
--- a/mongodb/reactive/pom.xml
+++ b/mongodb/reactive/pom.xml
@@ -16,7 +16,7 @@
-
+
org.springframework.boot
spring-boot-starter-data-mongodb-reactive
@@ -33,6 +33,12 @@
rxjava-reactive-streams
+
+ io.projectreactor
+ reactor-test
+ test
+
+
diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java
index 31886e7b..6e5807f8 100644
--- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java
+++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016 the original author or authors.
+ * Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,9 +19,10 @@ import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import rx.RxReactiveStreams;
-import java.util.concurrent.CountDownLatch;
+import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
@@ -47,19 +48,15 @@ public class ReactiveMongoTemplateIntegrationTest {
@Before
public void setUp() {
- template.collectionExists(Person.class) //
- .flatMap(exists -> exists ? template.dropCollection(Person.class) : Mono.just(exists)) //
- .flatMap(exists -> template.createCollection(Person.class)) //
- .then() //
- .block();
+ StepVerifier.create(template.dropCollection(Person.class)).verifyComplete();
- template
+ Flux insertAll = template
.insertAll(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
- new Person("Jesse", "Pinkman", 27)).collectList())
- .then() //
- .block();
+ new Person("Jesse", "Pinkman", 27)).collectList());
+
+ StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
}
/**
@@ -67,29 +64,24 @@ public class ReactiveMongoTemplateIntegrationTest {
* the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
+ public void shouldInsertAndCountData() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- template.count(new Query(), Person.class) //
+ Mono count = template.count(new Query(), Person.class) //
.doOnNext(System.out::println) //
- .thenMany(template.save(Flux.just(new Person("Hank", "Schrader", 43), //
+ .thenMany(template.insertAll(Arrays.asList(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> template.count(new Query(), Person.class)) //
- .doOnNext(System.out::println) //
- .doOnSuccess(it -> countDownLatch.countDown()) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
+ .doOnNext(System.out::println);//
- countDownLatch.await();
+ StepVerifier.create(count).expectNext(6L).verifyComplete();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
- public void convertReactorTypesToRxJava2() throws Exception {
+ public void convertReactorTypesToRxJava2() {
Flux flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class);
diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java
index 090bbd87..e8434abe 100644
--- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java
+++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java
@@ -20,12 +20,12 @@ import static org.assertj.core.api.Assertions.*;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
+import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,6 +35,8 @@ import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.test.context.junit4.SpringRunner;
+import com.mongodb.reactivestreams.client.MongoCollection;
+
/**
* Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators.
*
@@ -50,60 +52,50 @@ public class ReactivePersonRepositoryIntegrationTest {
@Before
public void setUp() {
- operations.collectionExists(Person.class) //
+ Mono> recreateCollection = operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
.size(1024 * 1024) //
.maxDocuments(100) //
- .capped())) //
- .block();
+ .capped()));
- repository
- .saveAll(Flux.just(new Person("Walter", "White", 50), //
+ StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
+
+ Flux insertAll = operations.insertAll(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
- new Person("Jesse", "Pinkman", 27))) //
- .then() //
- .block();
+ new Person("Jesse", "Pinkman", 27)).collectList());
+
+ StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
}
/**
- * This sample performs a count, inserts data and performs a count again using reactive operator chaining.
+ * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
+ * the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
+ public void shouldInsertAndCountData() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- repository.count() //
+ Mono saveAndCount = repository.count() //
.doOnNext(System.out::println) //
.thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> repository.count()) //
- .doOnNext(System.out::println) //
- .doOnSuccess(it -> countDownLatch.countDown()) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
+ .doOnNext(System.out::println);
- countDownLatch.await();
+ StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
- public void shouldPerformConversionBeforeResultProcessing() throws Exception {
+ public void shouldPerformConversionBeforeResultProcessing() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
-
- repository.findAll() //
- .doOnNext(System.out::println) //
- .doOnComplete(countDownLatch::countDown) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
-
- countDownLatch.await();
+ StepVerifier.create(repository.findAll().doOnNext(System.out::println)) //
+ .expectNextCount(4) //
+ .verifyComplete();
}
/**
@@ -123,15 +115,21 @@ public class ReactivePersonRepositoryIntegrationTest {
Thread.sleep(100);
- repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
+ StepVerifier.create(repository.save(new Person("Tuco", "Salamanca", 33))) //
+ .expectNextCount(1) //
+ .verifyComplete();
Thread.sleep(100);
- repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
+ StepVerifier.create(repository.save(new Person("Mike", "Ehrmantraut", 62))) //
+ .expectNextCount(1) //
+ .verifyComplete();
Thread.sleep(100);
disposable.dispose();
- repository.save(new Person("Gus", "Fring", 53)).subscribe();
+ StepVerifier.create(repository.save(new Person("Gus", "Fring", 53))) //
+ .expectNextCount(1) //
+ .verifyComplete();
Thread.sleep(100);
assertThat(people).hasSize(6);
@@ -142,12 +140,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
-
- List whites = repository.findByLastname("White") //
- .collectList() //
- .block();
-
- assertThat(whites).hasSize(2);
+ StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete();
}
/**
@@ -155,11 +148,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithStringQuery() {
-
- Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
- .block();
-
- assertThat(heisenberg).isNotNull();
+ StepVerifier.create(repository.findByFirstnameAndLastname("Walter", "White")).expectNextCount(1).verifyComplete();
}
/**
@@ -167,12 +156,7 @@ public class ReactivePersonRepositoryIntegrationTest {
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
-
- List whites = repository.findByLastname(Mono.just("White")) //
- .collectList() //
- .block();
-
- assertThat(whites).hasSize(2);
+ StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete();
}
/**
@@ -181,9 +165,8 @@ public class ReactivePersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
- Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
- .block();
-
- assertThat(heisenberg).isNotNull();
+ StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) //
+ .expectNextCount(1) //
+ .verifyComplete();
}
}
diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java
index 88884ed6..bc05b2e1 100644
--- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java
+++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java
@@ -21,12 +21,12 @@ import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
+import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,6 +36,8 @@ import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.test.context.junit4.SpringRunner;
+import com.mongodb.reactivestreams.client.MongoCollection;
+
/**
* Integration test for {@link RxJava2PersonRepository} using RxJava2 types. Note that {@link ReactiveMongoOperations}
* is only available using Project Reactor types as the native Template API implementation does not come in multiple
@@ -55,28 +57,31 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Before
public void setUp() {
- operations.collectionExists(Person.class) //
+ Mono> recreateCollection = operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
.size(1024 * 1024) //
.maxDocuments(100) //
- .capped())) //
- .block();
+ .capped()));
+
+ StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
repository.saveAll(Flowable.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))) //
- .blockingLast();
+ .test() //
+ .awaitCount(4) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
- * This sample performs a count, inserts data and performs a count again using reactive operator chaining.
+ * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
+ * the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
- public void shouldInsertAndCountData() throws Exception {
-
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ public void shouldInsertAndCountData() {
Flowable people = Flowable.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62));
@@ -89,28 +94,26 @@ public class RxJava2PersonRepositoryIntegrationTest {
.toSingle() //
.flatMap(v -> repository.count()) //
.doOnSuccess(System.out::println) //
- .doAfterTerminate(countDownLatch::countDown) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
-
- countDownLatch.await();
+ .test() //
+ .awaitCount(1) //
+ .assertValue(6L) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
- public void shouldPerformConversionBeforeResultProcessing() throws Exception {
-
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ public void shouldPerformConversionBeforeResultProcessing() {
repository.findAll() //
.doOnNext(System.out::println) //
- .doOnComplete(countDownLatch::countDown) //
- .doOnError(throwable -> countDownLatch.countDown()) //
- .subscribe();
+ .test() //
+ .awaitCount(4) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
- countDownLatch.await();
}
/**
@@ -130,15 +133,15 @@ public class RxJava2PersonRepositoryIntegrationTest {
Thread.sleep(100);
- repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
+ repository.save(new Person("Tuco", "Salamanca", 33)).test().awaitTerminalEvent();
Thread.sleep(100);
- repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
+ repository.save(new Person("Mike", "Ehrmantraut", 62)).test().awaitTerminalEvent();
Thread.sleep(100);
subscription.dispose();
- repository.save(new Person("Gus", "Fring", 53)).subscribe();
+ repository.save(new Person("Gus", "Fring", 53)).test().awaitTerminalEvent();
Thread.sleep(100);
assertThat(people).hasSize(6);
@@ -150,11 +153,12 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithQueryDerivation() {
- List whites = repository.findByLastname("White") //
- .toList() //
- .blockingGet();
+ repository.findByLastname("White") //
+ .test() //
+ .awaitCount(2) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
- assertThat(whites).hasSize(2);
}
/**
@@ -163,10 +167,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithStringQuery() {
- Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
- .blockingGet();
-
- assertThat(heisenberg).isNotNull();
+ repository.findByFirstnameAndLastname("Walter", "White") //
+ .test() //
+ .awaitCount(1) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -175,11 +180,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
- List whites = repository.findByLastname(Single.just("White")) //
- .toList() //
- .blockingGet();
-
- assertThat(whites).hasSize(2);
+ repository.findByLastname(Single.just("White")) //
+ .test() //
+ .awaitCount(2) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
/**
@@ -188,9 +193,10 @@ public class RxJava2PersonRepositoryIntegrationTest {
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
- Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
- .blockingGet();
-
- assertThat(heisenberg).isNotNull();
+ repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
+ .test() //
+ .awaitCount(1) //
+ .assertNoErrors() //
+ .awaitTerminalEvent();
}
}
diff --git a/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java
index 512ebb23..cf58cca2 100644
--- a/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java
+++ b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java
@@ -19,6 +19,7 @@ import example.springdata.redis.RedisTestConfiguration;
import example.springdata.redis.test.util.RequiresRedisServer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import java.nio.ByteBuffer;
import java.time.Duration;
@@ -37,6 +38,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
+import org.springframework.data.redis.util.ByteUtils;
import org.springframework.test.context.junit4.SpringRunner;
/**
@@ -75,13 +77,14 @@ public class KeyCommandsTests {
generateRandomKeys(50);
- this.connection.keyCommands() //
+ Mono keyCount = connection.keyCommands() //
.keys(ByteBuffer.wrap(serializer.serialize(KEY_PATTERN))) //
.flatMapMany(Flux::fromIterable) //
.doOnNext(byteBuffer -> System.out.println(toString(byteBuffer))) //
.count() //
- .doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))) //
- .block();
+ .doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count)));
+
+ StepVerifier.create(keyCount).expectNext(50L).verifyComplete();
}
/**
@@ -90,19 +93,19 @@ public class KeyCommandsTests {
@Test
public void storeToListAndPop() {
- Mono popResult = this.connection.listCommands()
+ Mono popResult = connection.listCommands()
.brPop(Collections.singletonList(ByteBuffer.wrap("list".getBytes())), Duration.ofSeconds(5));
- Mono llen = this.connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes()));
+ Mono llen = connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes()));
- this.connection.listCommands() //
+ Mono popAndLlen = connection.listCommands() //
.rPush(ByteBuffer.wrap("list".getBytes()), Collections.singletonList(ByteBuffer.wrap("item".getBytes())))
.flatMap(l -> popResult) //
.doOnNext(result -> System.out.println(toString(result.getValue()))) //
.flatMap(result -> llen) //
- .doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count))) //
- .then() //
- .block();
+ .doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count)));//
+
+ StepVerifier.create(popAndLlen).expectNext(0L).verifyComplete();
}
private void generateRandomKeys(int nrKeys) {
@@ -113,17 +116,11 @@ public class KeyCommandsTests {
.map(key -> SetCommand.set(key) //
.value(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())));
- this.connection.stringCommands() //
- .set(generator) //
- .then() //
- .block();
+ StepVerifier.create(connection.stringCommands().set(generator)).expectNextCount(nrKeys).verifyComplete();
}
private static String toString(ByteBuffer byteBuffer) {
-
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- return new String(bytes);
+ return new String(ByteUtils.getBytes(byteBuffer));
}
}