#266 - Refactor reactive tests to use StepVerifier and test methods.
We now use StepVerifier and RxJava's .test() methods instead of .block() calls. Using blocking methods is an anti pattern which should be avoided within tests. Test API comes with timeouts and protects tests from never completing.
This commit is contained in:
@@ -28,6 +28,12 @@
|
||||
<artifactId>rxjava-reactive-streams</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>spring-data-cassandra-example-utils</artifactId>
|
||||
|
||||
@@ -20,10 +20,10 @@ import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import example.springdata.cassandra.util.CassandraKeyspace;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
import rx.RxReactiveStreams;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
@@ -52,14 +52,14 @@ public class ReactiveCassandraTemplateIntegrationTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
template.truncate(Person.class) //
|
||||
Flux<Person> truncateAndInsert = template.truncate(Person.class) //
|
||||
.thenMany(Flux.just(new Person("Walter", "White", 50), //
|
||||
new Person("Skyler", "White", 45), //
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27))) //
|
||||
.flatMap(template::insert) //
|
||||
.then() //
|
||||
.block();
|
||||
.flatMap(template::insert);
|
||||
|
||||
StepVerifier.create(truncateAndInsert).expectNextCount(4).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -67,22 +67,18 @@ public class ReactiveCassandraTemplateIntegrationTest {
|
||||
* the two counts ({@code 4} and {@code 6}) to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
template.count(Person.class) //
|
||||
Mono<Long> saveAndCount = template.count(Person.class) //
|
||||
.doOnNext(System.out::println) //
|
||||
.thenMany(Flux.just(new Person("Hank", "Schrader", 43), //
|
||||
new Person("Mike", "Ehrmantraut", 62)))
|
||||
.flatMap(template::insert) //
|
||||
.last() //
|
||||
.flatMap(v -> template.count(Person.class)) //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnTerminate(countDownLatch::countDown) //
|
||||
.subscribe();
|
||||
.doOnNext(System.out::println);
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -15,14 +15,10 @@
|
||||
*/
|
||||
package example.springdata.cassandra.people;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import example.springdata.cassandra.util.CassandraKeyspace;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
@@ -51,34 +47,30 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
repository.deleteAll() //
|
||||
Flux<Person> deleteAndInsert = repository.deleteAll() //
|
||||
.thenMany(repository.saveAll(Flux.just(new Person("Walter", "White", 50), //
|
||||
new Person("Skyler", "White", 45), //
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27))))
|
||||
.then() //
|
||||
.block();
|
||||
new Person("Jesse", "Pinkman", 27))));
|
||||
|
||||
StepVerifier.create(deleteAndInsert).expectNextCount(4).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
repository.count() //
|
||||
Mono<Long> saveAndCount = repository.count() //
|
||||
.doOnNext(System.out::println) //
|
||||
.thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), //
|
||||
new Person("Mike", "Ehrmantraut", 62)))) //
|
||||
.last() //
|
||||
.flatMap(v -> repository.count()) //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnTerminate(countDownLatch::countDown) //
|
||||
.subscribe();
|
||||
.doOnNext(System.out::println);
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -86,17 +78,11 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
* prefetch define the amount of fetched records.
|
||||
*/
|
||||
@Test
|
||||
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
|
||||
public void shouldPerformConversionBeforeResultProcessing() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
repository.findAll() //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnComplete(countDownLatch::countDown) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(repository.findAll().doOnNext(System.out::println)) //
|
||||
.expectNextCount(4) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -104,12 +90,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname("White") //
|
||||
.collectList() //
|
||||
.block();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -117,11 +98,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithStringQuery() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameInAndLastname("Walter", "White") //
|
||||
.block();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
StepVerifier.create(repository.findByFirstnameInAndLastname("Walter", "White")).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -129,12 +106,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithDeferredQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname(Mono.just("White")) //
|
||||
.collectList() //
|
||||
.block();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,10 +115,9 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
|
||||
.block();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) //
|
||||
.expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,16 +15,11 @@
|
||||
*/
|
||||
package example.springdata.cassandra.people;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import example.springdata.cassandra.util.CassandraKeyspace;
|
||||
import io.reactivex.Completable;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.Single;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
@@ -60,16 +55,16 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27)));
|
||||
|
||||
deleteAll.andThen(save).blockingLast();
|
||||
deleteAll.andThen(save).test().await().assertNoErrors();
|
||||
}
|
||||
|
||||
/**
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
|
||||
* the two counts ({@code 4} and {@code 6}) to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
repository.count() //
|
||||
.doOnSuccess(System.out::println) //
|
||||
@@ -80,10 +75,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
.toSingle() //
|
||||
.flatMap(v -> repository.count()) //
|
||||
.doOnSuccess(System.out::println) //
|
||||
.doAfterTerminate(countDownLatch::countDown) //
|
||||
.subscribe();
|
||||
|
||||
countDownLatch.await();
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertValue(6L) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -91,17 +87,14 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
* prefetch define the amount of fetched records.
|
||||
*/
|
||||
@Test
|
||||
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
public void shouldPerformConversionBeforeResultProcessing() {
|
||||
|
||||
repository.findAll() //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnEach(it -> countDownLatch.countDown()) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
|
||||
countDownLatch.await();
|
||||
.test() //
|
||||
.awaitCount(4) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -110,11 +103,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname("White") //
|
||||
.toList() //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
repository.findByLastname("White") //
|
||||
.test() //
|
||||
.awaitCount(2) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,10 +116,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithStringQuery() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
repository.findByFirstnameAndLastname("Walter", "White") //
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -135,11 +129,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithDeferredQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname(Single.just("White")) //
|
||||
.toList() //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
repository.findByLastname(Single.just("White")) //
|
||||
.test() //
|
||||
.awaitCount(2) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -148,9 +142,10 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
|
||||
@@ -33,6 +33,12 @@
|
||||
<artifactId>rxjava-reactive-streams</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,9 +19,10 @@ import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
import rx.RxReactiveStreams;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@@ -47,19 +48,15 @@ public class ReactiveMongoTemplateIntegrationTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
template.collectionExists(Person.class) //
|
||||
.flatMap(exists -> exists ? template.dropCollection(Person.class) : Mono.just(exists)) //
|
||||
.flatMap(exists -> template.createCollection(Person.class)) //
|
||||
.then() //
|
||||
.block();
|
||||
StepVerifier.create(template.dropCollection(Person.class)).verifyComplete();
|
||||
|
||||
template
|
||||
Flux<Person> insertAll = template
|
||||
.insertAll(Flux.just(new Person("Walter", "White", 50), //
|
||||
new Person("Skyler", "White", 45), //
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27)).collectList())
|
||||
.then() //
|
||||
.block();
|
||||
new Person("Jesse", "Pinkman", 27)).collectList());
|
||||
|
||||
StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -67,29 +64,24 @@ public class ReactiveMongoTemplateIntegrationTest {
|
||||
* the two counts ({@code 4} and {@code 6}) to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
template.count(new Query(), Person.class) //
|
||||
Mono<Long> count = template.count(new Query(), Person.class) //
|
||||
.doOnNext(System.out::println) //
|
||||
.thenMany(template.save(Flux.just(new Person("Hank", "Schrader", 43), //
|
||||
.thenMany(template.insertAll(Arrays.asList(new Person("Hank", "Schrader", 43), //
|
||||
new Person("Mike", "Ehrmantraut", 62)))) //
|
||||
.last() //
|
||||
.flatMap(v -> template.count(new Query(), Person.class)) //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnSuccess(it -> countDownLatch.countDown()) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
.doOnNext(System.out::println);//
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(count).expectNext(6L).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that the all object conversions are performed before the results are printed to the console.
|
||||
*/
|
||||
@Test
|
||||
public void convertReactorTypesToRxJava2() throws Exception {
|
||||
public void convertReactorTypesToRxJava2() {
|
||||
|
||||
Flux<Person> flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class);
|
||||
|
||||
|
||||
@@ -20,12 +20,12 @@ import static org.assertj.core.api.Assertions.*;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@@ -35,6 +35,8 @@ import org.springframework.data.mongodb.core.CollectionOptions;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import com.mongodb.reactivestreams.client.MongoCollection;
|
||||
|
||||
/**
|
||||
* Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators.
|
||||
*
|
||||
@@ -50,60 +52,50 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
operations.collectionExists(Person.class) //
|
||||
Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
|
||||
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
|
||||
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
|
||||
.size(1024 * 1024) //
|
||||
.maxDocuments(100) //
|
||||
.capped())) //
|
||||
.block();
|
||||
.capped()));
|
||||
|
||||
repository
|
||||
.saveAll(Flux.just(new Person("Walter", "White", 50), //
|
||||
StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
|
||||
|
||||
Flux<Person> insertAll = operations.insertAll(Flux.just(new Person("Walter", "White", 50), //
|
||||
new Person("Skyler", "White", 45), //
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27))) //
|
||||
.then() //
|
||||
.block();
|
||||
new Person("Jesse", "Pinkman", 27)).collectList());
|
||||
|
||||
StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
|
||||
* the two counts ({@code 4} and {@code 6}) to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
repository.count() //
|
||||
Mono<Long> saveAndCount = repository.count() //
|
||||
.doOnNext(System.out::println) //
|
||||
.thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), //
|
||||
new Person("Mike", "Ehrmantraut", 62)))) //
|
||||
.last() //
|
||||
.flatMap(v -> repository.count()) //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnSuccess(it -> countDownLatch.countDown()) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
.doOnNext(System.out::println);
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that the all object conversions are performed before the results are printed to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
|
||||
public void shouldPerformConversionBeforeResultProcessing() {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
repository.findAll() //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnComplete(countDownLatch::countDown) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
|
||||
countDownLatch.await();
|
||||
StepVerifier.create(repository.findAll().doOnNext(System.out::println)) //
|
||||
.expectNextCount(4) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,15 +115,21 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
|
||||
StepVerifier.create(repository.save(new Person("Tuco", "Salamanca", 33))) //
|
||||
.expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
Thread.sleep(100);
|
||||
|
||||
repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
|
||||
StepVerifier.create(repository.save(new Person("Mike", "Ehrmantraut", 62))) //
|
||||
.expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
Thread.sleep(100);
|
||||
|
||||
disposable.dispose();
|
||||
|
||||
repository.save(new Person("Gus", "Fring", 53)).subscribe();
|
||||
StepVerifier.create(repository.save(new Person("Gus", "Fring", 53))) //
|
||||
.expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
Thread.sleep(100);
|
||||
|
||||
assertThat(people).hasSize(6);
|
||||
@@ -142,12 +140,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname("White") //
|
||||
.collectList() //
|
||||
.block();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -155,11 +148,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithStringQuery() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
|
||||
.block();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
StepVerifier.create(repository.findByFirstnameAndLastname("Walter", "White")).expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -167,12 +156,7 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void shouldQueryDataWithDeferredQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname(Mono.just("White")) //
|
||||
.collectList() //
|
||||
.block();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -181,9 +165,8 @@ public class ReactivePersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
|
||||
.block();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) //
|
||||
.expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,12 +21,12 @@ import io.reactivex.Flowable;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.disposables.Disposable;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@@ -36,6 +36,8 @@ import org.springframework.data.mongodb.core.CollectionOptions;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import com.mongodb.reactivestreams.client.MongoCollection;
|
||||
|
||||
/**
|
||||
* Integration test for {@link RxJava2PersonRepository} using RxJava2 types. Note that {@link ReactiveMongoOperations}
|
||||
* is only available using Project Reactor types as the native Template API implementation does not come in multiple
|
||||
@@ -55,28 +57,31 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
operations.collectionExists(Person.class) //
|
||||
Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
|
||||
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
|
||||
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
|
||||
.size(1024 * 1024) //
|
||||
.maxDocuments(100) //
|
||||
.capped())) //
|
||||
.block();
|
||||
.capped()));
|
||||
|
||||
StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
|
||||
|
||||
repository.saveAll(Flowable.just(new Person("Walter", "White", 50), //
|
||||
new Person("Skyler", "White", 45), //
|
||||
new Person("Saul", "Goodman", 42), //
|
||||
new Person("Jesse", "Pinkman", 27))) //
|
||||
.blockingLast();
|
||||
.test() //
|
||||
.awaitCount(4) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
|
||||
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
|
||||
* the two counts ({@code 4} and {@code 6}) to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldInsertAndCountData() throws Exception {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
public void shouldInsertAndCountData() {
|
||||
|
||||
Flowable<Person> people = Flowable.just(new Person("Hank", "Schrader", 43), //
|
||||
new Person("Mike", "Ehrmantraut", 62));
|
||||
@@ -89,28 +94,26 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
.toSingle() //
|
||||
.flatMap(v -> repository.count()) //
|
||||
.doOnSuccess(System.out::println) //
|
||||
.doAfterTerminate(countDownLatch::countDown) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
|
||||
countDownLatch.await();
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertValue(6L) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that the all object conversions are performed before the results are printed to the console.
|
||||
*/
|
||||
@Test
|
||||
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
public void shouldPerformConversionBeforeResultProcessing() {
|
||||
|
||||
repository.findAll() //
|
||||
.doOnNext(System.out::println) //
|
||||
.doOnComplete(countDownLatch::countDown) //
|
||||
.doOnError(throwable -> countDownLatch.countDown()) //
|
||||
.subscribe();
|
||||
.test() //
|
||||
.awaitCount(4) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
|
||||
countDownLatch.await();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -130,15 +133,15 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
|
||||
repository.save(new Person("Tuco", "Salamanca", 33)).test().awaitTerminalEvent();
|
||||
Thread.sleep(100);
|
||||
|
||||
repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
|
||||
repository.save(new Person("Mike", "Ehrmantraut", 62)).test().awaitTerminalEvent();
|
||||
Thread.sleep(100);
|
||||
|
||||
subscription.dispose();
|
||||
|
||||
repository.save(new Person("Gus", "Fring", 53)).subscribe();
|
||||
repository.save(new Person("Gus", "Fring", 53)).test().awaitTerminalEvent();
|
||||
Thread.sleep(100);
|
||||
|
||||
assertThat(people).hasSize(6);
|
||||
@@ -150,11 +153,12 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname("White") //
|
||||
.toList() //
|
||||
.blockingGet();
|
||||
repository.findByLastname("White") //
|
||||
.test() //
|
||||
.awaitCount(2) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -163,10 +167,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithStringQuery() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
repository.findByFirstnameAndLastname("Walter", "White") //
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,11 +180,11 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithDeferredQueryDerivation() {
|
||||
|
||||
List<Person> whites = repository.findByLastname(Single.just("White")) //
|
||||
.toList() //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(whites).hasSize(2);
|
||||
repository.findByLastname(Single.just("White")) //
|
||||
.test() //
|
||||
.awaitCount(2) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -188,9 +193,10 @@ public class RxJava2PersonRepositoryIntegrationTest {
|
||||
@Test
|
||||
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
|
||||
|
||||
Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
|
||||
.blockingGet();
|
||||
|
||||
assertThat(heisenberg).isNotNull();
|
||||
repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
|
||||
.test() //
|
||||
.awaitCount(1) //
|
||||
.assertNoErrors() //
|
||||
.awaitTerminalEvent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import example.springdata.redis.RedisTestConfiguration;
|
||||
import example.springdata.redis.test.util.RequiresRedisServer;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
@@ -37,6 +38,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
import org.springframework.data.redis.util.ByteUtils;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
/**
|
||||
@@ -75,13 +77,14 @@ public class KeyCommandsTests {
|
||||
|
||||
generateRandomKeys(50);
|
||||
|
||||
this.connection.keyCommands() //
|
||||
Mono<Long> keyCount = connection.keyCommands() //
|
||||
.keys(ByteBuffer.wrap(serializer.serialize(KEY_PATTERN))) //
|
||||
.flatMapMany(Flux::fromIterable) //
|
||||
.doOnNext(byteBuffer -> System.out.println(toString(byteBuffer))) //
|
||||
.count() //
|
||||
.doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))) //
|
||||
.block();
|
||||
.doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count)));
|
||||
|
||||
StepVerifier.create(keyCount).expectNext(50L).verifyComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -90,19 +93,19 @@ public class KeyCommandsTests {
|
||||
@Test
|
||||
public void storeToListAndPop() {
|
||||
|
||||
Mono<PopResult> popResult = this.connection.listCommands()
|
||||
Mono<PopResult> popResult = connection.listCommands()
|
||||
.brPop(Collections.singletonList(ByteBuffer.wrap("list".getBytes())), Duration.ofSeconds(5));
|
||||
|
||||
Mono<Long> llen = this.connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes()));
|
||||
Mono<Long> llen = connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes()));
|
||||
|
||||
this.connection.listCommands() //
|
||||
Mono<Long> popAndLlen = connection.listCommands() //
|
||||
.rPush(ByteBuffer.wrap("list".getBytes()), Collections.singletonList(ByteBuffer.wrap("item".getBytes())))
|
||||
.flatMap(l -> popResult) //
|
||||
.doOnNext(result -> System.out.println(toString(result.getValue()))) //
|
||||
.flatMap(result -> llen) //
|
||||
.doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count))) //
|
||||
.then() //
|
||||
.block();
|
||||
.doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count)));//
|
||||
|
||||
StepVerifier.create(popAndLlen).expectNext(0L).verifyComplete();
|
||||
}
|
||||
|
||||
private void generateRandomKeys(int nrKeys) {
|
||||
@@ -113,17 +116,11 @@ public class KeyCommandsTests {
|
||||
.map(key -> SetCommand.set(key) //
|
||||
.value(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())));
|
||||
|
||||
this.connection.stringCommands() //
|
||||
.set(generator) //
|
||||
.then() //
|
||||
.block();
|
||||
StepVerifier.create(connection.stringCommands().set(generator)).expectNextCount(nrKeys).verifyComplete();
|
||||
|
||||
}
|
||||
|
||||
private static String toString(ByteBuffer byteBuffer) {
|
||||
|
||||
byte[] bytes = new byte[byteBuffer.remaining()];
|
||||
byteBuffer.get(bytes);
|
||||
return new String(bytes);
|
||||
return new String(ByteUtils.getBytes(byteBuffer));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user