diff --git a/cassandra/pom.xml b/cassandra/pom.xml index f30cff29..60afd4a8 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -20,6 +20,7 @@ util example java8 + reactive diff --git a/cassandra/reactive/README.md b/cassandra/reactive/README.md new file mode 100644 index 00000000..561729fd --- /dev/null +++ b/cassandra/reactive/README.md @@ -0,0 +1,79 @@ +# Spring Data Cassandra 2.0 - Reactive examples + +This project contains samples of reactive data access features with Spring Data (Cassandra). + +## Reactive Template API usage with `ReactiveCassandraTemplate` + +The main reactive Template API class is `ReactiveCassandraTemplate`, ideally used through its interface `ReactiveCassandraOperations`. It defines a basic set of reactive data access operations using [Project Reactor](http://projectreactor.io) `Mono` and `Flux` reactive types. + +```java +template.insert(Flux.just(new Person("Walter", "White", 50), + new Person("Skyler", "White", 45), + new Person("Saul", "Goodman", 42), + new Person("Jesse", "Pinkman", 27))); + +Flux flux = template.select(select() + .from("person") + .where(eq("lastname", "White")), Person.class); +``` + +The test cases in `ReactiveCassandraTemplateIntegrationTest` show basic Template API usage. +Reactive data access reads and converts individual elements while processing the stream. + + +## Reactive Repository support + +Spring Data Cassandra provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types. + +```java +public interface ReactivePersonRepository extends ReactiveCrudRepository { + + Flux findByLastname(String lastname); + + @Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1") + Mono findByFirstnameAndLastname(String firstname, String lastname); + + // Accept parameter inside a reactive type for deferred execution + Flux findByLastname(Mono lastname); + + Mono findByFirstnameAndLastname(Mono firstname, String lastname); + + @InfiniteStream // Use a tailable cursor + Flux findWithTailableCursorBy(); +} +``` + +```java +public interface RxJava1PersonRepository extends RxJava1CrudRepository { + + Observable findByLastname(String lastname); + + @Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1") + Single findByFirstnameAndLastname(String firstname, String lastname); + + // Accept parameter inside a reactive type for deferred execution + Observable findByLastname(Single lastname); + + Single findByFirstnameAndLastname(Single firstname, String lastname); + + @InfiniteStream // Use a tailable cursor + Observable findWithTailableCursorBy(); +} +``` + +## Preparation + +### Install Cassandra + +Before we can start we have to install Cassandra, e.g. via brew on Max OS. + +More details can be found here: https://wiki.apache.org/cassandra/GettingStarted + +### Start Cassandra + +``` +/usr/local/bin/cassandra -f +``` + +That should be enough to get you started. +Now you can simply type ```mvn clean install``` to run the example. diff --git a/cassandra/reactive/pom.xml b/cassandra/reactive/pom.xml new file mode 100644 index 00000000..1ec7c6da --- /dev/null +++ b/cassandra/reactive/pom.xml @@ -0,0 +1,65 @@ + + 4.0.0 + + + org.springframework.data.examples + spring-data-cassandra-examples + 1.0.0.BUILD-SNAPSHOT + + + spring-data-cassandra-reactive + Spring Data Cassandra - Reactive features + + + Kay-BUILD-SNAPSHOT + 5.0.0.M3 + 3.0.3.RELEASE + 1.2.1 + 1.2.0 + + + + + + org.springframework.data + spring-data-commons + + + + org.springframework.data + spring-cql + + + + org.springframework.data + spring-data-cassandra + + + + io.projectreactor + reactor-core + + + + io.reactivex + rxjava + ${rxjava.version} + + + + io.reactivex + rxjava-reactive-streams + ${rxjava-reactive-streams.version} + + + + ${project.groupId} + spring-data-cassandra-example-utils + ${project.version} + test + + + + + diff --git a/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ApplicationConfiguration.java b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ApplicationConfiguration.java new file mode 100644 index 00000000..6bb15377 --- /dev/null +++ b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ApplicationConfiguration.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.data.cassandra.config.SchemaAction; +import org.springframework.data.cassandra.config.java.AbstractReactiveCassandraConfiguration; +import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories; + +/** + * Simple configuration for reactive Cassandra support. + * + * @author Mark Paluch + */ +@SpringBootApplication +@EnableReactiveCassandraRepositories +class ApplicationConfiguration extends AbstractReactiveCassandraConfiguration { + + @Override + protected String getKeyspaceName() { + return "example"; + } + + @Override + public SchemaAction getSchemaAction() { + return SchemaAction.RECREATE; + } +} diff --git a/cassandra/reactive/src/main/java/example/springdata/cassandra/people/Person.java b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/Person.java new file mode 100644 index 00000000..7d6b8703 --- /dev/null +++ b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/Person.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import org.springframework.cassandra.core.PrimaryKeyType; +import org.springframework.data.cassandra.mapping.PrimaryKeyColumn; +import org.springframework.data.cassandra.mapping.Table; + +/** + * An entity to represent a Person. + * + * @author Mark Paluch + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Table +public class Person { + + @PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED, ordinal = 2) private String firstname; + @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 1) private String lastname; + private int age; +} diff --git a/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ReactivePersonRepository.java b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ReactivePersonRepository.java new file mode 100644 index 00000000..db394ad6 --- /dev/null +++ b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/ReactivePersonRepository.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.data.cassandra.repository.Query; +import org.springframework.data.repository.reactive.ReactiveCrudRepository; + +/** + * Repository interface to manage {@link Person} instances. + * + * @author Mark Paluch + */ +public interface ReactivePersonRepository extends ReactiveCrudRepository { + + /** + * Derived query selecting by {@code lastname}. + * + * @param lastname + * @return + */ + Flux findByLastname(String lastname); + + /** + * String query selecting one entity. + * + * @param lastname + * @return + */ + @Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1") + Mono findByFirstnameInAndLastname(String firstname, String lastname); + + /** + * Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require + * blocking to obtain the parameter value. + * + * @param lastname + * @return + */ + Flux findByLastname(Mono lastname); + + /** + * Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that + * does not require blocking to obtain the parameter value. + * + * @param firstname + * @param lastname + * @return + */ + Mono findByFirstnameAndLastname(Mono firstname, String lastname); +} diff --git a/cassandra/reactive/src/main/java/example/springdata/cassandra/people/RxJava1PersonRepository.java b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/RxJava1PersonRepository.java new file mode 100644 index 00000000..6c9ad53c --- /dev/null +++ b/cassandra/reactive/src/main/java/example/springdata/cassandra/people/RxJava1PersonRepository.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import rx.Observable; +import rx.Single; + +import org.springframework.data.cassandra.repository.Query; +import org.springframework.data.repository.reactive.RxJava1CrudRepository; + +/** + * Repository interface to manage {@link Person} instances. + * + * @author Mark Paluch + */ +public interface RxJava1PersonRepository extends RxJava1CrudRepository { + + /** + * Derived query selecting by {@code lastname}. + * + * @param lastname + * @return + */ + Observable findByLastname(String lastname); + + /** + * String query selecting one entity. + * + * @param lastname + * @return + */ + @Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1") + Single findByFirstnameAndLastname(String firstname, String lastname); + + /** + * Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require + * blocking to obtain the parameter value. + * + * @param lastname + * @return + */ + Observable findByLastname(Single lastname); + + /** + * Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that + * does not require blocking to obtain the parameter value. + * + * @param firstname + * @param lastname + * @return + */ + Single findByFirstnameAndLastname(Single firstname, String lastname); +} diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java new file mode 100644 index 00000000..d55e6dd8 --- /dev/null +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.*; +import static org.assertj.core.api.Assertions.*; + +import example.springdata.cassandra.util.RequiresCassandraKeyspace; +import reactor.core.publisher.Flux; +import rx.RxReactiveStreams; + +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.cassandra.core.ReactiveCassandraTemplate; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link ReactiveCassandraTemplate}. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReactiveCassandraTemplateIntegrationTest { + + @ClassRule public final static RequiresCassandraKeyspace CASSANDRA_KEYSPACE = RequiresCassandraKeyspace.onLocalhost(); + + @Autowired ReactiveCassandraTemplate template; + + /** + * Truncate table and insert some rows. + */ + @Before + public void setUp() { + + template.truncate(Person.class) // + .thenMany(template.insert(Flux.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27)))) // + .then() // + .block(); + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints + * the two counts ({@code 4} and {@code 6}) to the console. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + template.count(Person.class) // + .doOnNext(System.out::println) // + .thenMany(template.insert(Flux.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .flatMap(v -> template.count(Person.class)) // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Note that the all object conversions are performed before the results are printed to the console. + */ + @Test + public void convertReactorTypesToRxJava1() throws Exception { + + Flux flux = template.select(select().from("person").where(eq("lastname", "White")), Person.class); + + long count = RxReactiveStreams.toObservable(flux) // + .count() // + .toSingle() // + .toBlocking() // + .value(); // + + assertThat(count).isEqualTo(2); + } +} diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java new file mode 100644 index 00000000..91756ee0 --- /dev/null +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReactivePersonRepositoryIntegrationTest { + + @Autowired ReactivePersonRepository repository; + + /** + * Clear table and insert some rows. + */ + @Before + public void setUp() { + + repository.deleteAll() // + .thenMany(repository.save(Flux.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27)))) + .then() // + .block(); + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.count() // + .doOnNext(System.out::println) // + .thenMany(repository.save(Flux.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .flatMap(v -> repository.count()) // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Result set {@link com.datastax.driver.core.Row}s are converted to entities as they are emitted. Reactive pull and + * prefetch define the amount of fetched records. + */ + @Test + public void shouldPerformConversionBeforeResultProcessing() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.findAll() // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithQueryDerivation() { + + List whites = repository.findByLastname("White") // + .collectList() // + .block(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using a string query. + */ + @Test + public void shouldQueryDataWithStringQuery() { + + Person heisenberg = repository.findByFirstnameInAndLastname("Walter", "White") // + .block(); + + assertThat(heisenberg).isNotNull(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithDeferredQueryDerivation() { + + List whites = repository.findByLastname(Mono.just("White")) // + .collectList() // + .block(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using query derivation and deferred parameter resolution. + */ + @Test + public void shouldQueryDataWithMixedDeferredQueryDerivation() { + + Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") // + .block(); + + assertThat(heisenberg).isNotNull(); + } + +} diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava1PersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava1PersonRepositoryIntegrationTest.java new file mode 100644 index 00000000..d24f2310 --- /dev/null +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava1PersonRepositoryIntegrationTest.java @@ -0,0 +1,160 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.cassandra.people; + +import static org.assertj.core.api.Assertions.*; + +import example.springdata.cassandra.util.RequiresCassandraKeyspace; +import rx.Completable; +import rx.Observable; +import rx.Single; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link RxJava1PersonRepository} using RxJava1 types. Note that + * {@link ReactiveCassandraOperations} is only available using Project Reactor types as the native Template API + * implementation does not come in multiple reactive flavors. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class RxJava1PersonRepositoryIntegrationTest { + + @ClassRule public final static RequiresCassandraKeyspace CASSANDRA_KEYSPACE = RequiresCassandraKeyspace.onLocalhost(); + + @Autowired RxJava1PersonRepository repository; + @Autowired ReactiveCassandraOperations operations; + + @Before + public void setUp() throws Exception { + + Completable deleteAll = repository.deleteAll(); + + Observable save = repository.save(Observable.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27))); + + deleteAll.andThen(save).toBlocking().last(); + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.count() // + .doOnSuccess(System.out::println) // + .toObservable() // + .switchMap(count -> repository.save(Observable.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .toSingle() // + .flatMap(v -> repository.count()) // + .doOnSuccess(System.out::println) // + .doAfterTerminate(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Result set {@link com.datastax.driver.core.Row}s are converted to entities as they are emitted. Reactive pull and + * prefetch define the amount of fetched records. + */ + @Test + public void shouldPerformConversionBeforeResultProcessing() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.findAll() // + .doOnNext(System.out::println) // + .doOnCompleted(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithQueryDerivation() { + + List whites = repository.findByLastname("White") // + .toList() // + .toBlocking() // + .last(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using a string query. + */ + @Test + public void shouldQueryDataWithStringQuery() { + + Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // + .toBlocking() // + .value(); + + assertThat(heisenberg).isNotNull(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithDeferredQueryDerivation() { + + List whites = repository.findByLastname(Single.just("White")) // + .toList() // + .toBlocking() // + .single(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using query derivation and deferred parameter resolution. + */ + @Test + public void shouldQueryDataWithMixedDeferredQueryDerivation() { + + Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // + .toBlocking().value(); + + assertThat(heisenberg).isNotNull(); + } +} diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/package-info.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/package-info.java new file mode 100644 index 00000000..b74fb714 --- /dev/null +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/package-info.java @@ -0,0 +1,5 @@ +/** + * Package showing usage of Spring Data Cassandra Reactive Repositories and reactive Cassandra template. + */ +package example.springdata.cassandra.people; + diff --git a/cassandra/reactive/src/test/resources/application.properties b/cassandra/reactive/src/test/resources/application.properties new file mode 100644 index 00000000..a1d10619 --- /dev/null +++ b/cassandra/reactive/src/test/resources/application.properties @@ -0,0 +1,2 @@ +logging.level.org.springframework.data.cassandra=INFO + diff --git a/mongodb/pom.xml b/mongodb/pom.xml index a0d4d9e1..162a84d6 100644 --- a/mongodb/pom.xml +++ b/mongodb/pom.xml @@ -24,6 +24,7 @@ security geo-json query-by-example + reactive diff --git a/mongodb/reactive/README.md b/mongodb/reactive/README.md new file mode 100644 index 00000000..5e9c0f2f --- /dev/null +++ b/mongodb/reactive/README.md @@ -0,0 +1,66 @@ +# Spring Data MongoDB 2.0 - Reactive examples + +This project contains samples of reactive data access features with Spring Data (MongoDB). + +## Prerequisites + +MongoDB requires the Reactive Streams driver to provide reactive data access. +The Reactive Streams driver maintains its own connections. Using Spring Data MongoDB Reactive support +together with blocking Spring Data MongoDB data access will open multiple connections to your MongoDB servers. + +## Reactive Template API usage with `ReactiveMongoTemplate` + +The main reactive Template API class is `ReactiveMongoTemplate`, ideally used through its interface `ReactiveMongoOperations`. It defines a basic set of reactive data access operations using [Project Reactor](http://projectreactor.io) `Mono` and `Flux` reactive types. + +```java +template.insertAll(Flux.just(new Person("Walter", "White", 50), + new Person("Skyler", "White", 45), + new Person("Saul", "Goodman", 42), + new Person("Jesse", "Pinkman", 27))); + +Flux flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class); +``` + +The test cases in `ReactiveMongoTemplateIntegrationTest` show basic Template API usage. +Reactive data access reads and converts individual elements while processing the stream. + + +## Reactive Repository support + +Spring Data MongoDB provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types. + +```java +public interface ReactivePersonRepository extends ReactiveCrudRepository { + + Flux findByLastname(String lastname); + + @Query("{ 'firstname': ?0, 'lastname': ?1}") + Mono findByFirstnameAndLastname(String firstname, String lastname); + + // Accept parameter inside a reactive type for deferred execution + Flux findByLastname(Mono lastname); + + Mono findByFirstnameAndLastname(Mono firstname, String lastname); + + @InfiniteStream // Use a tailable cursor + Flux findWithTailableCursorBy(); +} +``` + +```java +public interface RxJava1PersonRepository extends RxJava1CrudRepository { + + Observable findByLastname(String lastname); + + @Query("{ 'firstname': ?0, 'lastname': ?1}") + Single findByFirstnameAndLastname(String firstname, String lastname); + + // Accept parameter inside a reactive type for deferred execution + Observable findByLastname(Single lastname); + + Single findByFirstnameAndLastname(Single firstname, String lastname); + + @InfiniteStream // Use a tailable cursor + Observable findWithTailableCursorBy(); +} +``` \ No newline at end of file diff --git a/mongodb/reactive/pom.xml b/mongodb/reactive/pom.xml new file mode 100644 index 00000000..7a448f6c --- /dev/null +++ b/mongodb/reactive/pom.xml @@ -0,0 +1,60 @@ + + 4.0.0 + + + org.springframework.data.examples + spring-data-mongodb-examples + 1.0.0.BUILD-SNAPSHOT + + + spring-data-mongodb-reactive + Spring Data MongoDB - Reactive features + + + Kay-BUILD-SNAPSHOT + 5.0.0.M3 + 3.0.3.RELEASE + 1.2.1 + 1.2.0 + 1.2.0 + + + + + + org.springframework.data + spring-data-commons + + + + org.springframework.data + spring-data-mongodb + + + + io.projectreactor + reactor-core + + + + org.mongodb + mongodb-driver-reactivestreams + ${mongodb-driver-reactivestreams.version} + + + + io.reactivex + rxjava + ${rxjava.version} + + + + io.reactivex + rxjava-reactive-streams + ${rxjava-reactive-streams.version} + + + + + diff --git a/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ApplicationConfiguration.java b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ApplicationConfiguration.java new file mode 100644 index 00000000..a74c4888 --- /dev/null +++ b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ApplicationConfiguration.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration; +import org.springframework.data.mongodb.core.mapping.event.LoggingEventListener; +import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories; + +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoClients; + +/** + * Simple configuration that registers a {@link LoggingEventListener} to demonstrate mapping behavior when streaming + * data. + * + * @author Mark Paluch + */ +@SpringBootApplication(exclude = { MongoAutoConfiguration.class, MongoDataAutoConfiguration.class }) +@EnableReactiveMongoRepositories +@AutoConfigureAfter(EmbeddedMongoAutoConfiguration.class) +class ApplicationConfiguration extends AbstractReactiveMongoConfiguration { + + @Bean + public LoggingEventListener mongoEventListener() { + return new LoggingEventListener(); + } + + @Override + @Bean + public MongoClient mongoClient() { + return MongoClients.create(); + } + + @Override + protected String getDatabaseName() { + return "reactive"; + } +} diff --git a/mongodb/reactive/src/main/java/example/springdata/mongodb/people/Person.java b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/Person.java new file mode 100644 index 00000000..00b2a76e --- /dev/null +++ b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/Person.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +/** + * An entity to represent a Person. + * + * @author Mark Paluch + */ +@Data +@RequiredArgsConstructor +@Document +public class Person { + + private @Id String id; + private final String firstname; + private final String lastname; + private final int age; +} diff --git a/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ReactivePersonRepository.java b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ReactivePersonRepository.java new file mode 100644 index 00000000..deb38af4 --- /dev/null +++ b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/ReactivePersonRepository.java @@ -0,0 +1,75 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.data.mongodb.repository.InfiniteStream; +import org.springframework.data.mongodb.repository.Query; +import org.springframework.data.repository.reactive.ReactiveCrudRepository; + +/** + * Repository interface to manage {@link Person} instances. + * + * @author Mark Paluch + */ +public interface ReactivePersonRepository extends ReactiveCrudRepository { + + /** + * Derived query selecting by {@code lastname}. + * + * @param lastname + * @return + */ + Flux findByLastname(String lastname); + + /** + * String query selecting one entity. + * + * @param lastname + * @return + */ + @Query("{ 'firstname': ?0, 'lastname': ?1}") + Mono findByFirstnameAndLastname(String firstname, String lastname); + + /** + * Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require + * blocking to obtain the parameter value. + * + * @param lastname + * @return + */ + Flux findByLastname(Mono lastname); + + /** + * Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that + * does not require blocking to obtain the parameter value. + * + * @param firstname + * @param lastname + * @return + */ + Mono findByFirstnameAndLastname(Mono firstname, String lastname); + + /** + * Use a tailable cursor to emit a stream of entities as new entities are written to the capped collection. + * + * @return + */ + @InfiniteStream + Flux findWithTailableCursorBy(); +} diff --git a/mongodb/reactive/src/main/java/example/springdata/mongodb/people/RxJava1PersonRepository.java b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/RxJava1PersonRepository.java new file mode 100644 index 00000000..d01c829c --- /dev/null +++ b/mongodb/reactive/src/main/java/example/springdata/mongodb/people/RxJava1PersonRepository.java @@ -0,0 +1,75 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import rx.Observable; +import rx.Single; + +import org.springframework.data.mongodb.repository.InfiniteStream; +import org.springframework.data.mongodb.repository.Query; +import org.springframework.data.repository.reactive.RxJava1CrudRepository; + +/** + * Repository interface to manage {@link Person} instances. + * + * @author Mark Paluch + */ +public interface RxJava1PersonRepository extends RxJava1CrudRepository { + + /** + * Derived query selecting by {@code lastname}. + * + * @param lastname + * @return + */ + Observable findByLastname(String lastname); + + /** + * String query selecting one entity. + * + * @param lastname + * @return + */ + @Query("{ 'firstname': ?0, 'lastname': ?1}") + Single findByFirstnameAndLastname(String firstname, String lastname); + + /** + * Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require + * blocking to obtain the parameter value. + * + * @param lastname + * @return + */ + Observable findByLastname(Single lastname); + + /** + * Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution which + * does not require blocking to obtain the parameter value. + * + * @param firstname + * @param lastname + * @return + */ + Single findByFirstnameAndLastname(Single firstname, String lastname); + + /** + * Use a tailable cursor to emit a stream of entities as new entities are written to the capped collection. + * + * @return + */ + @InfiniteStream + Observable findWithTailableCursorBy(); +} diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java new file mode 100644 index 00000000..be0f264d --- /dev/null +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import rx.RxReactiveStreams; + +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.mongodb.core.ReactiveMongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link ReactiveMongoTemplate}. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReactiveMongoTemplateIntegrationTest { + + @Autowired ReactiveMongoTemplate template; + + @Before + public void setUp() { + + template.collectionExists(Person.class) // + .flatMap(exists -> exists ? template.dropCollection(Person.class) : Mono.just(exists)) // + .flatMap(exists -> template.createCollection(Person.class)) // + .then() // + .block(); + + template + .insertAll(Flux.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27)).collectList()) + .then() // + .block(); + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints + * the two counts ({@code 4} and {@code 6}) to the console. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + template.count(new Query(), Person.class) // + .doOnNext(System.out::println) // + .thenMany(template.save(Flux.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .flatMap(v -> template.count(new Query(), Person.class)) // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Note that the all object conversions are performed before the results are printed to the console. + */ + @Test + public void convertReactorTypesToRxJava1() throws Exception { + + Flux flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class); + + long count = RxReactiveStreams.toObservable(flux).count().toSingle().toBlocking().value(); + + assertThat(count).isEqualTo(2); + } +} diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java new file mode 100644 index 00000000..79774e9f --- /dev/null +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java @@ -0,0 +1,181 @@ +/* + * Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.Cancellation; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.mongodb.core.CollectionOptions; +import org.springframework.data.mongodb.core.ReactiveMongoOperations; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReactivePersonRepositoryIntegrationTest { + + @Autowired ReactivePersonRepository repository; + @Autowired ReactiveMongoOperations operations; + + @Before + public void setUp() { + + operations.collectionExists(Person.class) // + .flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) // + .flatMap(o -> operations.createCollection(Person.class, new CollectionOptions(1024 * 1024, 100, true))) // + .then() // + .block(); + + repository + .save(Flux.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27))) // + .then() // + .block(); + + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.count() // + .doOnNext(System.out::println) // + .thenMany(repository.save(Flux.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .flatMap(v -> repository.count()) // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Note that the all object conversions are performed before the results are printed to the console. + */ + @Test + public void shouldPerformConversionBeforeResultProcessing() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.findAll() // + .doOnNext(System.out::println) // + .doOnComplete(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * A tailable cursor streams data using {@link Flux} as it arrives inside the capped collection. + */ + @Test + public void shouldStreamDataWithTailableCursor() throws Exception { + + Cancellation cancellation = repository.findWithTailableCursorBy() // + .doOnNext(System.out::println) // + .doOnComplete(() -> System.out.println("Complete")) // + .doOnTerminate(() -> System.out.println("Terminated")) // + .subscribe(); + + Thread.sleep(100); + + repository.save(new Person("Tuco", "Salamanca", 33)).subscribe(); + Thread.sleep(100); + + repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe(); + Thread.sleep(100); + + cancellation.dispose(); + + repository.save(new Person("Gus", "Fring", 53)).subscribe(); + Thread.sleep(100); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithQueryDerivation() { + + List whites = repository.findByLastname("White") // + .collectList() // + .block(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using a string query. + */ + @Test + public void shouldQueryDataWithStringQuery() { + + Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // + .block(); + + assertThat(heisenberg).isNotNull(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithDeferredQueryDerivation() { + + List whites = repository.findByLastname(Mono.just("White")) // + .collectList() // + .block(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using query derivation and deferred parameter resolution. + */ + @Test + public void shouldQueryDataWithMixedDeferredQueryDerivation() { + + Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") // + .block(); + + assertThat(heisenberg).isNotNull(); + } +} diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava1PersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava1PersonRepositoryIntegrationTest.java new file mode 100644 index 00000000..d62c65b2 --- /dev/null +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava1PersonRepositoryIntegrationTest.java @@ -0,0 +1,189 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.mongodb.people; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import rx.Observable; +import rx.Single; +import rx.Subscription; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.mongodb.core.CollectionOptions; +import org.springframework.data.mongodb.core.ReactiveMongoOperations; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * Integration test for {@link RxJava1PersonRepository} using RxJava1 types. Note that {@link ReactiveMongoOperations} + * is only available using Project Reactor types as the native Template API implementation does not come in multiple + * reactive flavors. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class RxJava1PersonRepositoryIntegrationTest { + + @Autowired RxJava1PersonRepository repository; + @Autowired ReactiveMongoOperations operations; + + @Before + public void setUp() { + + operations.collectionExists(Person.class) // + .flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) // + .flatMap(o -> operations.createCollection(Person.class, new CollectionOptions(1024 * 1024, 100, true))) // + .then() // + .block(); + + repository + .save(Observable.just(new Person("Walter", "White", 50), // + new Person("Skyler", "White", 45), // + new Person("Saul", "Goodman", 42), // + new Person("Jesse", "Pinkman", 27))) // + .toBlocking() // + .last(); + } + + /** + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + */ + @Test + public void shouldInsertAndCountData() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.count() // + .doOnSuccess(System.out::println) // + .toObservable() // + .switchMap(count -> repository.save(Observable.just(new Person("Hank", "Schrader", 43), // + new Person("Mike", "Ehrmantraut", 62)))) // + .last() // + .toSingle() // + .flatMap(v -> repository.count()) // + .doOnSuccess(System.out::println) // + .doAfterTerminate(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * Note that the all object conversions are performed before the results are printed to the console. + */ + @Test + public void shouldPerformConversionBeforeResultProcessing() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(1); + + repository.findAll() // + .doOnNext(System.out::println) // + .doOnCompleted(countDownLatch::countDown) // + .doOnError(throwable -> countDownLatch.countDown()) // + .subscribe(); + + countDownLatch.await(); + } + + /** + * A tailable cursor streams data using {@link Flux} as it arrives inside the capped collection. + */ + @Test + public void shouldStreamDataWithTailableCursor() throws Exception { + + Subscription subscription = repository.findWithTailableCursorBy() // + .doOnNext(System.out::println) // + .doOnCompleted(() -> System.out.println("Complete")) // + .doOnTerminate(() -> System.out.println("Terminated")) // + .subscribe(); + + Thread.sleep(100); + + repository.save(new Person("Tuco", "Salamanca", 33)).subscribe(); + Thread.sleep(100); + + repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe(); + Thread.sleep(100); + + subscription.unsubscribe(); + + repository.save(new Person("Gus", "Fring", 53)).subscribe(); + Thread.sleep(100); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithQueryDerivation() { + + List whites = repository.findByLastname("White") // + .toList() // + .toBlocking() // + .last(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using a string query. + */ + @Test + public void shouldQueryDataWithStringQuery() { + + Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // + .toBlocking() // + .value(); + + assertThat(heisenberg).isNotNull(); + } + + /** + * Fetch data using query derivation. + */ + @Test + public void shouldQueryDataWithDeferredQueryDerivation() { + + List whites = repository.findByLastname(Single.just("White")) // + .toList() // + .toBlocking() // + .single(); + + assertThat(whites).hasSize(2); + } + + /** + * Fetch data using query derivation and deferred parameter resolution. + */ + @Test + public void shouldQueryDataWithMixedDeferredQueryDerivation() { + + Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // + .toBlocking().value(); + + assertThat(heisenberg).isNotNull(); + } +} diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/package-info.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/package-info.java new file mode 100644 index 00000000..2507b030 --- /dev/null +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/package-info.java @@ -0,0 +1,4 @@ +/** + * Package showing usage of Spring Data MongoDB Reactive Repositories and reactive MongoDB template. + */ +package example.springdata.mongodb.people; diff --git a/redis/pom.xml b/redis/pom.xml index f00419e3..8ed4e576 100644 --- a/redis/pom.xml +++ b/redis/pom.xml @@ -21,6 +21,7 @@ example cluster repositories + reactive diff --git a/redis/reactive/README.md b/redis/reactive/README.md new file mode 100644 index 00000000..77dd828a --- /dev/null +++ b/redis/reactive/README.md @@ -0,0 +1,8 @@ +# Spring Data Redis 2.0 - Reactive examples + +This project contains samples of reactive data access features with Spring Data (Redis). + +## Prerequisites + +This project contains samples of specific features of Spring Data Redis using reactive infrastructure. + diff --git a/redis/reactive/pom.xml b/redis/reactive/pom.xml new file mode 100644 index 00000000..81cba821 --- /dev/null +++ b/redis/reactive/pom.xml @@ -0,0 +1,49 @@ + + 4.0.0 + + spring-data-redis-reactive + Spring Data Redis - Reactive support + + + org.springframework.data.examples + spring-data-redis-examples + 1.0.0.BUILD-SNAPSHOT + ../pom.xml + + + + Kay-BUILD-SNAPSHOT + 5.0.0.M3 + 3.0.3.RELEASE + 5.0.0.Beta1 + + + + + + org.springframework.data + spring-data-redis + + + + io.projectreactor + reactor-core + + + + biz.paluch.redis + lettuce + ${lettuce.version} + + + + ${project.groupId} + spring-data-redis-example-utils + ${project.version} + test + + + + + diff --git a/redis/reactive/src/test/java/example/springdata/redis/RedisTestConfiguration.java b/redis/reactive/src/test/java/example/springdata/redis/RedisTestConfiguration.java new file mode 100644 index 00000000..26c7d1dc --- /dev/null +++ b/redis/reactive/src/test/java/example/springdata/redis/RedisTestConfiguration.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.redis; + +import javax.annotation.PreDestroy; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; + +/** + * @author Mark Paluch + */ +@SpringBootApplication +public class RedisTestConfiguration { + + @Autowired RedisConnectionFactory factory; + + @Bean + public RedisConnectionFactory redisConnectionFactory() { + return new LettuceConnectionFactory(); + } + + /** + * Clear database before shut down. + */ + public @PreDestroy void flushTestDb() { + factory.getConnection().flushDb(); + } +} diff --git a/redis/reactive/src/test/java/example/springdata/redis/commands/KeyOperationsTests.java b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyOperationsTests.java new file mode 100644 index 00000000..3f124ab1 --- /dev/null +++ b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyOperationsTests.java @@ -0,0 +1,128 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.springdata.redis.commands; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.redis.connection.ReactiveListCommands.PopResult; +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.test.context.junit4.SpringRunner; + +import example.springdata.redis.RedisTestConfiguration; +import example.springdata.redis.test.util.RequiresRedisServer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Show usage of reactive operations on Redis keys using low level API provided by {@link RedisConnectionFactory}. + * + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = RedisTestConfiguration.class) +public class KeyOperationsTests { + + // we only want to run this tests when redis is up an running + public static @ClassRule RequiresRedisServer requiresServer = RequiresRedisServer.onLocalhost(); + + private static final String PREFIX = KeyOperationsTests.class.getSimpleName(); + private static final String KEY_PATTERN = PREFIX + "*"; + + @Autowired RedisConnectionFactory connectionFactory; + + private ReactiveRedisConnection connection; + private RedisSerializer serializer = new StringRedisSerializer(); + + @Before + public void setUp() { + this.connection = connectionFactory.getReactiveConnection(); + } + + /** + * Uses {@code KEYS} command for loading all matching keys.
+ * Note that {@code KEYS} is a blocking command that potentially might affect other operations execution time.
+ * All keys will be loaded within one single operation. + */ + @Test + public void iterateOverKeysMatchingPrefixUsingKeysCommand() { + + generateRandomKeys(50); + + this.connection.keyCommands() // + .keys(ByteBuffer.wrap(serializer.serialize(KEY_PATTERN))) // + .flatMap(Flux::fromIterable) // + .doOnNext(byteBuffer -> System.out.println(toString(byteBuffer))) // + .count() // + .doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))) // + .block(); + } + + /** + * Uses {@code RPUSH} to store an item inside a list and {@code BRPOP}
+ */ + @Test + public void storeToListAndPop() { + + Mono popResult = this.connection.listCommands() + .brPop(Collections.singletonList(ByteBuffer.wrap("list".getBytes())), Duration.ofSeconds(5)); + + Mono llen = this.connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes())); + + this.connection.listCommands() // + .rPush(ByteBuffer.wrap("list".getBytes()), Collections.singletonList(ByteBuffer.wrap("item".getBytes()))) + .flatMap(l -> popResult) // + .doOnNext(result -> System.out.println(toString(result.getValue()))) // + .flatMap(result -> llen) // + .doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count))) // + .then() // + .block(); + } + + private void generateRandomKeys(int nrKeys) { + + Flux keyFlux = Flux.range(0, nrKeys).map(i -> (PREFIX + "-" + i)); + + Flux generator = keyFlux.map(String::getBytes).map(ByteBuffer::wrap) // + .map(key -> SetCommand.set(key) // + .value(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes()))); + + this.connection.stringCommands() // + .set(generator) // + .then() // + .block(); + + } + + private static String toString(ByteBuffer byteBuffer) { + + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return new String(bytes); + } +} diff --git a/redis/reactive/src/test/resources/application.properties b/redis/reactive/src/test/resources/application.properties new file mode 100644 index 00000000..bac6605c --- /dev/null +++ b/redis/reactive/src/test/resources/application.properties @@ -0,0 +1 @@ +logging.level.root=WARN