#215 - Add reactive examples for MongoDB, Apache Cassandra and Redis.

This commit is contained in:
Mark Paluch
2016-10-28 17:46:19 +02:00
committed by Oliver Gierke
parent cf5d9f3562
commit 20879e7fa3
29 changed files with 1855 additions and 0 deletions

View File

@@ -20,6 +20,7 @@
<module>util</module>
<module>example</module>
<module>java8</module>
<module>reactive</module>
</modules>
<dependencies>

View File

@@ -0,0 +1,79 @@
# Spring Data Cassandra 2.0 - Reactive examples
This project contains samples of reactive data access features with Spring Data (Cassandra).
## Reactive Template API usage with `ReactiveCassandraTemplate`
The main reactive Template API class is `ReactiveCassandraTemplate`, ideally used through its interface `ReactiveCassandraOperations`. It defines a basic set of reactive data access operations using [Project Reactor](http://projectreactor.io) `Mono` and `Flux` reactive types.
```java
template.insert(Flux.just(new Person("Walter", "White", 50),
new Person("Skyler", "White", 45),
new Person("Saul", "Goodman", 42),
new Person("Jesse", "Pinkman", 27)));
Flux<Person> flux = template.select(select()
.from("person")
.where(eq("lastname", "White")), Person.class);
```
The test cases in `ReactiveCassandraTemplateIntegrationTest` show basic Template API usage.
Reactive data access reads and converts individual elements while processing the stream.
## Reactive Repository support
Spring Data Cassandra provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types.
```java
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(String lastname);
@Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Flux<Person> findByLastname(Mono<String> lastname);
Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Flux<Person> findWithTailableCursorBy();
}
```
```java
public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {
Observable<Person> findByLastname(String lastname);
@Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1")
Single<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Observable<Person> findByLastname(Single<String> lastname);
Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Observable<Person> findWithTailableCursorBy();
}
```
## Preparation
### Install Cassandra
Before we can start we have to install Cassandra, e.g. via brew on Max OS.
More details can be found here: https://wiki.apache.org/cassandra/GettingStarted
### Start Cassandra
```
/usr/local/bin/cassandra -f
```
That should be enough to get you started.
Now you can simply type ```mvn clean install``` to run the example.

View File

@@ -0,0 +1,65 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.data.examples</groupId>
<artifactId>spring-data-cassandra-examples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-data-cassandra-reactive</artifactId>
<name>Spring Data Cassandra - Reactive features</name>
<properties>
<spring-data-releasetrain.version>Kay-BUILD-SNAPSHOT</spring-data-releasetrain.version>
<spring.version>5.0.0.M3</spring.version>
<reactor.version>3.0.3.RELEASE</reactor.version>
<rxjava.version>1.2.1</rxjava.version>
<rxjava-reactive-streams.version>1.2.0</rxjava-reactive-streams.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-cql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<version>${rxjava-reactive-streams.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spring-data-cassandra-example-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,41 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.cassandra.config.SchemaAction;
import org.springframework.data.cassandra.config.java.AbstractReactiveCassandraConfiguration;
import org.springframework.data.cassandra.repository.config.EnableReactiveCassandraRepositories;
/**
* Simple configuration for reactive Cassandra support.
*
* @author Mark Paluch
*/
@SpringBootApplication
@EnableReactiveCassandraRepositories
class ApplicationConfiguration extends AbstractReactiveCassandraConfiguration {
@Override
protected String getKeyspaceName() {
return "example";
}
@Override
public SchemaAction getSchemaAction() {
return SchemaAction.RECREATE;
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.cassandra.core.PrimaryKeyType;
import org.springframework.data.cassandra.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.mapping.Table;
/**
* An entity to represent a Person.
*
* @author Mark Paluch
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table
public class Person {
@PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED, ordinal = 2) private String firstname;
@PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 1) private String lastname;
private int age;
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
/**
* Repository interface to manage {@link Person} instances.
*
* @author Mark Paluch
*/
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
/**
* Derived query selecting by {@code lastname}.
*
* @param lastname
* @return
*/
Flux<Person> findByLastname(String lastname);
/**
* String query selecting one entity.
*
* @param lastname
* @return
*/
@Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1")
Mono<Person> findByFirstnameInAndLastname(String firstname, String lastname);
/**
* Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require
* blocking to obtain the parameter value.
*
* @param lastname
* @return
*/
Flux<Person> findByLastname(Mono<String> lastname);
/**
* Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that
* does not require blocking to obtain the parameter value.
*
* @param firstname
* @param lastname
* @return
*/
Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import rx.Observable;
import rx.Single;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
/**
* Repository interface to manage {@link Person} instances.
*
* @author Mark Paluch
*/
public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {
/**
* Derived query selecting by {@code lastname}.
*
* @param lastname
* @return
*/
Observable<Person> findByLastname(String lastname);
/**
* String query selecting one entity.
*
* @param lastname
* @return
*/
@Query("SELECT * FROM person WHERE firstname = ?0 and lastname = ?1")
Single<Person> findByFirstnameAndLastname(String firstname, String lastname);
/**
* Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require
* blocking to obtain the parameter value.
*
* @param lastname
* @return
*/
Observable<Person> findByLastname(Single<String> lastname);
/**
* Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that
* does not require blocking to obtain the parameter value.
*
* @param firstname
* @param lastname
* @return
*/
Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
import static org.assertj.core.api.Assertions.*;
import example.springdata.cassandra.util.RequiresCassandraKeyspace;
import reactor.core.publisher.Flux;
import rx.RxReactiveStreams;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.cassandra.core.ReactiveCassandraTemplate;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link ReactiveCassandraTemplate}.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveCassandraTemplateIntegrationTest {
@ClassRule public final static RequiresCassandraKeyspace CASSANDRA_KEYSPACE = RequiresCassandraKeyspace.onLocalhost();
@Autowired ReactiveCassandraTemplate template;
/**
* Truncate table and insert some rows.
*/
@Before
public void setUp() {
template.truncate(Person.class) //
.thenMany(template.insert(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27)))) //
.then() //
.block();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
* the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
template.count(Person.class) //
.doOnNext(System.out::println) //
.thenMany(template.insert(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> template.count(Person.class)) //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
public void convertReactorTypesToRxJava1() throws Exception {
Flux<Person> flux = template.select(select().from("person").where(eq("lastname", "White")), Person.class);
long count = RxReactiveStreams.toObservable(flux) //
.count() //
.toSingle() //
.toBlocking() //
.value(); //
assertThat(count).isEqualTo(2);
}
}

View File

@@ -0,0 +1,149 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactivePersonRepositoryIntegrationTest {
@Autowired ReactivePersonRepository repository;
/**
* Clear table and insert some rows.
*/
@Before
public void setUp() {
repository.deleteAll() //
.thenMany(repository.save(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))))
.then() //
.block();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.count() //
.doOnNext(System.out::println) //
.thenMany(repository.save(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> repository.count()) //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Result set {@link com.datastax.driver.core.Row}s are converted to entities as they are emitted. Reactive pull and
* prefetch define the amount of fetched records.
*/
@Test
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.findAll() //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
List<Person> whites = repository.findByLastname("White") //
.collectList() //
.block();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using a string query.
*/
@Test
public void shouldQueryDataWithStringQuery() {
Person heisenberg = repository.findByFirstnameInAndLastname("Walter", "White") //
.block();
assertThat(heisenberg).isNotNull();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
List<Person> whites = repository.findByLastname(Mono.just("White")) //
.collectList() //
.block();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using query derivation and deferred parameter resolution.
*/
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
.block();
assertThat(heisenberg).isNotNull();
}
}

View File

@@ -0,0 +1,160 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.cassandra.people;
import static org.assertj.core.api.Assertions.*;
import example.springdata.cassandra.util.RequiresCassandraKeyspace;
import rx.Completable;
import rx.Observable;
import rx.Single;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link RxJava1PersonRepository} using RxJava1 types. Note that
* {@link ReactiveCassandraOperations} is only available using Project Reactor types as the native Template API
* implementation does not come in multiple reactive flavors.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RxJava1PersonRepositoryIntegrationTest {
@ClassRule public final static RequiresCassandraKeyspace CASSANDRA_KEYSPACE = RequiresCassandraKeyspace.onLocalhost();
@Autowired RxJava1PersonRepository repository;
@Autowired ReactiveCassandraOperations operations;
@Before
public void setUp() throws Exception {
Completable deleteAll = repository.deleteAll();
Observable<Person> save = repository.save(Observable.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27)));
deleteAll.andThen(save).toBlocking().last();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.count() //
.doOnSuccess(System.out::println) //
.toObservable() //
.switchMap(count -> repository.save(Observable.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.toSingle() //
.flatMap(v -> repository.count()) //
.doOnSuccess(System.out::println) //
.doAfterTerminate(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Result set {@link com.datastax.driver.core.Row}s are converted to entities as they are emitted. Reactive pull and
* prefetch define the amount of fetched records.
*/
@Test
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.findAll() //
.doOnNext(System.out::println) //
.doOnCompleted(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
List<Person> whites = repository.findByLastname("White") //
.toList() //
.toBlocking() //
.last();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using a string query.
*/
@Test
public void shouldQueryDataWithStringQuery() {
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
.toBlocking() //
.value();
assertThat(heisenberg).isNotNull();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
List<Person> whites = repository.findByLastname(Single.just("White")) //
.toList() //
.toBlocking() //
.single();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using query derivation and deferred parameter resolution.
*/
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
.toBlocking().value();
assertThat(heisenberg).isNotNull();
}
}

View File

@@ -0,0 +1,5 @@
/**
* Package showing usage of Spring Data Cassandra Reactive Repositories and reactive Cassandra template.
*/
package example.springdata.cassandra.people;

View File

@@ -0,0 +1,2 @@
logging.level.org.springframework.data.cassandra=INFO

View File

@@ -24,6 +24,7 @@
<module>security</module>
<module>geo-json</module>
<module>query-by-example</module>
<module>reactive</module>
</modules>
<dependencies>

View File

@@ -0,0 +1,66 @@
# Spring Data MongoDB 2.0 - Reactive examples
This project contains samples of reactive data access features with Spring Data (MongoDB).
## Prerequisites
MongoDB requires the Reactive Streams driver to provide reactive data access.
The Reactive Streams driver maintains its own connections. Using Spring Data MongoDB Reactive support
together with blocking Spring Data MongoDB data access will open multiple connections to your MongoDB servers.
## Reactive Template API usage with `ReactiveMongoTemplate`
The main reactive Template API class is `ReactiveMongoTemplate`, ideally used through its interface `ReactiveMongoOperations`. It defines a basic set of reactive data access operations using [Project Reactor](http://projectreactor.io) `Mono` and `Flux` reactive types.
```java
template.insertAll(Flux.just(new Person("Walter", "White", 50),
new Person("Skyler", "White", 45),
new Person("Saul", "Goodman", 42),
new Person("Jesse", "Pinkman", 27)));
Flux<Person> flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class);
```
The test cases in `ReactiveMongoTemplateIntegrationTest` show basic Template API usage.
Reactive data access reads and converts individual elements while processing the stream.
## Reactive Repository support
Spring Data MongoDB provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types.
```java
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(String lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Flux<Person> findByLastname(Mono<String> lastname);
Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Flux<Person> findWithTailableCursorBy();
}
```
```java
public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {
Observable<Person> findByLastname(String lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Single<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Observable<Person> findByLastname(Single<String> lastname);
Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Observable<Person> findWithTailableCursorBy();
}
```

60
mongodb/reactive/pom.xml Normal file
View File

@@ -0,0 +1,60 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.data.examples</groupId>
<artifactId>spring-data-mongodb-examples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-data-mongodb-reactive</artifactId>
<name>Spring Data MongoDB - Reactive features</name>
<properties>
<spring-data-releasetrain.version>Kay-BUILD-SNAPSHOT</spring-data-releasetrain.version>
<spring.version>5.0.0.M3</spring.version>
<reactor.version>3.0.3.RELEASE</reactor.version>
<rxjava.version>1.2.1</rxjava.version>
<rxjava-reactive-streams.version>1.2.0</rxjava-reactive-streams.version>
<mongodb-driver-reactivestreams.version>1.2.0</mongodb-driver-reactivestreams.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>${mongodb-driver-reactivestreams.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<version>${rxjava-reactive-streams.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.mapping.event.LoggingEventListener;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
/**
* Simple configuration that registers a {@link LoggingEventListener} to demonstrate mapping behavior when streaming
* data.
*
* @author Mark Paluch
*/
@SpringBootApplication(exclude = { MongoAutoConfiguration.class, MongoDataAutoConfiguration.class })
@EnableReactiveMongoRepositories
@AutoConfigureAfter(EmbeddedMongoAutoConfiguration.class)
class ApplicationConfiguration extends AbstractReactiveMongoConfiguration {
@Bean
public LoggingEventListener mongoEventListener() {
return new LoggingEventListener();
}
@Override
@Bean
public MongoClient mongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "reactive";
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
/**
* An entity to represent a Person.
*
* @author Mark Paluch
*/
@Data
@RequiredArgsConstructor
@Document
public class Person {
private @Id String id;
private final String firstname;
private final String lastname;
private final int age;
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.mongodb.repository.InfiniteStream;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
/**
* Repository interface to manage {@link Person} instances.
*
* @author Mark Paluch
*/
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
/**
* Derived query selecting by {@code lastname}.
*
* @param lastname
* @return
*/
Flux<Person> findByLastname(String lastname);
/**
* String query selecting one entity.
*
* @param lastname
* @return
*/
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
/**
* Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require
* blocking to obtain the parameter value.
*
* @param lastname
* @return
*/
Flux<Person> findByLastname(Mono<String> lastname);
/**
* Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution that
* does not require blocking to obtain the parameter value.
*
* @param firstname
* @param lastname
* @return
*/
Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);
/**
* Use a tailable cursor to emit a stream of entities as new entities are written to the capped collection.
*
* @return
*/
@InfiniteStream
Flux<Person> findWithTailableCursorBy();
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import rx.Observable;
import rx.Single;
import org.springframework.data.mongodb.repository.InfiniteStream;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
/**
* Repository interface to manage {@link Person} instances.
*
* @author Mark Paluch
*/
public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {
/**
* Derived query selecting by {@code lastname}.
*
* @param lastname
* @return
*/
Observable<Person> findByLastname(String lastname);
/**
* String query selecting one entity.
*
* @param lastname
* @return
*/
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Single<Person> findByFirstnameAndLastname(String firstname, String lastname);
/**
* Derived query selecting by {@code lastname}. {@code lastname} uses deferred resolution that does not require
* blocking to obtain the parameter value.
*
* @param lastname
* @return
*/
Observable<Person> findByLastname(Single<String> lastname);
/**
* Derived query selecting by {@code firstname} and {@code lastname}. {@code firstname} uses deferred resolution which
* does not require blocking to obtain the parameter value.
*
* @param firstname
* @param lastname
* @return
*/
Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);
/**
* Use a tailable cursor to emit a stream of entities as new entities are written to the capped collection.
*
* @return
*/
@InfiniteStream
Observable<Person> findWithTailableCursorBy();
}

View File

@@ -0,0 +1,100 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link ReactiveMongoTemplate}.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveMongoTemplateIntegrationTest {
@Autowired ReactiveMongoTemplate template;
@Before
public void setUp() {
template.collectionExists(Person.class) //
.flatMap(exists -> exists ? template.dropCollection(Person.class) : Mono.just(exists)) //
.flatMap(exists -> template.createCollection(Person.class)) //
.then() //
.block();
template
.insertAll(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27)).collectList())
.then() //
.block();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
* the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
template.count(new Query(), Person.class) //
.doOnNext(System.out::println) //
.thenMany(template.save(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> template.count(new Query(), Person.class)) //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
public void convertReactorTypesToRxJava1() throws Exception {
Flux<Person> flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class);
long count = RxReactiveStreams.toObservable(flux).count().toSingle().toBlocking().value();
assertThat(count).isEqualTo(2);
}
}

View File

@@ -0,0 +1,181 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import static org.assertj.core.api.Assertions.*;
import reactor.core.Cancellation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactivePersonRepositoryIntegrationTest {
@Autowired ReactivePersonRepository repository;
@Autowired ReactiveMongoOperations operations;
@Before
public void setUp() {
operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.flatMap(o -> operations.createCollection(Person.class, new CollectionOptions(1024 * 1024, 100, true))) //
.then() //
.block();
repository
.save(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))) //
.then() //
.block();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.count() //
.doOnNext(System.out::println) //
.thenMany(repository.save(Flux.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.flatMap(v -> repository.count()) //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.findAll() //
.doOnNext(System.out::println) //
.doOnComplete(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* A tailable cursor streams data using {@link Flux} as it arrives inside the capped collection.
*/
@Test
public void shouldStreamDataWithTailableCursor() throws Exception {
Cancellation cancellation = repository.findWithTailableCursorBy() //
.doOnNext(System.out::println) //
.doOnComplete(() -> System.out.println("Complete")) //
.doOnTerminate(() -> System.out.println("Terminated")) //
.subscribe();
Thread.sleep(100);
repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
Thread.sleep(100);
repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
Thread.sleep(100);
cancellation.dispose();
repository.save(new Person("Gus", "Fring", 53)).subscribe();
Thread.sleep(100);
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
List<Person> whites = repository.findByLastname("White") //
.collectList() //
.block();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using a string query.
*/
@Test
public void shouldQueryDataWithStringQuery() {
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
.block();
assertThat(heisenberg).isNotNull();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
List<Person> whites = repository.findByLastname(Mono.just("White")) //
.collectList() //
.block();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using query derivation and deferred parameter resolution.
*/
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") //
.block();
assertThat(heisenberg).isNotNull();
}
}

View File

@@ -0,0 +1,189 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.mongodb.people;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import rx.Subscription;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Integration test for {@link RxJava1PersonRepository} using RxJava1 types. Note that {@link ReactiveMongoOperations}
* is only available using Project Reactor types as the native Template API implementation does not come in multiple
* reactive flavors.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RxJava1PersonRepositoryIntegrationTest {
@Autowired RxJava1PersonRepository repository;
@Autowired ReactiveMongoOperations operations;
@Before
public void setUp() {
operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.flatMap(o -> operations.createCollection(Person.class, new CollectionOptions(1024 * 1024, 100, true))) //
.then() //
.block();
repository
.save(Observable.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))) //
.toBlocking() //
.last();
}
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining.
*/
@Test
public void shouldInsertAndCountData() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.count() //
.doOnSuccess(System.out::println) //
.toObservable() //
.switchMap(count -> repository.save(Observable.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62)))) //
.last() //
.toSingle() //
.flatMap(v -> repository.count()) //
.doOnSuccess(System.out::println) //
.doAfterTerminate(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* Note that the all object conversions are performed before the results are printed to the console.
*/
@Test
public void shouldPerformConversionBeforeResultProcessing() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
repository.findAll() //
.doOnNext(System.out::println) //
.doOnCompleted(countDownLatch::countDown) //
.doOnError(throwable -> countDownLatch.countDown()) //
.subscribe();
countDownLatch.await();
}
/**
* A tailable cursor streams data using {@link Flux} as it arrives inside the capped collection.
*/
@Test
public void shouldStreamDataWithTailableCursor() throws Exception {
Subscription subscription = repository.findWithTailableCursorBy() //
.doOnNext(System.out::println) //
.doOnCompleted(() -> System.out.println("Complete")) //
.doOnTerminate(() -> System.out.println("Terminated")) //
.subscribe();
Thread.sleep(100);
repository.save(new Person("Tuco", "Salamanca", 33)).subscribe();
Thread.sleep(100);
repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe();
Thread.sleep(100);
subscription.unsubscribe();
repository.save(new Person("Gus", "Fring", 53)).subscribe();
Thread.sleep(100);
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithQueryDerivation() {
List<Person> whites = repository.findByLastname("White") //
.toList() //
.toBlocking() //
.last();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using a string query.
*/
@Test
public void shouldQueryDataWithStringQuery() {
Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") //
.toBlocking() //
.value();
assertThat(heisenberg).isNotNull();
}
/**
* Fetch data using query derivation.
*/
@Test
public void shouldQueryDataWithDeferredQueryDerivation() {
List<Person> whites = repository.findByLastname(Single.just("White")) //
.toList() //
.toBlocking() //
.single();
assertThat(whites).hasSize(2);
}
/**
* Fetch data using query derivation and deferred parameter resolution.
*/
@Test
public void shouldQueryDataWithMixedDeferredQueryDerivation() {
Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") //
.toBlocking().value();
assertThat(heisenberg).isNotNull();
}
}

View File

@@ -0,0 +1,4 @@
/**
* Package showing usage of Spring Data MongoDB Reactive Repositories and reactive MongoDB template.
*/
package example.springdata.mongodb.people;

View File

@@ -21,6 +21,7 @@
<module>example</module>
<module>cluster</module>
<module>repositories</module>
<module>reactive</module>
</modules>
<dependencies>

8
redis/reactive/README.md Normal file
View File

@@ -0,0 +1,8 @@
# Spring Data Redis 2.0 - Reactive examples
This project contains samples of reactive data access features with Spring Data (Redis).
## Prerequisites
This project contains samples of specific features of Spring Data Redis using reactive infrastructure.

49
redis/reactive/pom.xml Normal file
View File

@@ -0,0 +1,49 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-data-redis-reactive</artifactId>
<name>Spring Data Redis - Reactive support</name>
<parent>
<groupId>org.springframework.data.examples</groupId>
<artifactId>spring-data-redis-examples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<properties>
<spring-data-releasetrain.version>Kay-BUILD-SNAPSHOT</spring-data-releasetrain.version>
<spring.version>5.0.0.M3</spring.version>
<reactor.version>3.0.3.RELEASE</reactor.version>
<lettuce.version>5.0.0.Beta1</lettuce.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>biz.paluch.redis</groupId>
<artifactId>lettuce</artifactId>
<version>${lettuce.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spring-data-redis-example-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.redis;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
/**
* @author Mark Paluch
*/
@SpringBootApplication
public class RedisTestConfiguration {
@Autowired RedisConnectionFactory factory;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory();
}
/**
* Clear database before shut down.
*/
public @PreDestroy void flushTestDb() {
factory.getConnection().flushDb();
}
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.springdata.redis.commands;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.ReactiveListCommands.PopResult;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.test.context.junit4.SpringRunner;
import example.springdata.redis.RedisTestConfiguration;
import example.springdata.redis.test.util.RequiresRedisServer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Show usage of reactive operations on Redis keys using low level API provided by {@link RedisConnectionFactory}.
*
* @author Mark Paluch
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedisTestConfiguration.class)
public class KeyOperationsTests {
// we only want to run this tests when redis is up an running
public static @ClassRule RequiresRedisServer requiresServer = RequiresRedisServer.onLocalhost();
private static final String PREFIX = KeyOperationsTests.class.getSimpleName();
private static final String KEY_PATTERN = PREFIX + "*";
@Autowired RedisConnectionFactory connectionFactory;
private ReactiveRedisConnection connection;
private RedisSerializer<String> serializer = new StringRedisSerializer();
@Before
public void setUp() {
this.connection = connectionFactory.getReactiveConnection();
}
/**
* Uses {@code KEYS} command for loading all matching keys. <br />
* Note that {@code KEYS} is a blocking command that potentially might affect other operations execution time. <br />
* All keys will be loaded within <strong>one single</strong> operation.
*/
@Test
public void iterateOverKeysMatchingPrefixUsingKeysCommand() {
generateRandomKeys(50);
this.connection.keyCommands() //
.keys(ByteBuffer.wrap(serializer.serialize(KEY_PATTERN))) //
.flatMap(Flux::fromIterable) //
.doOnNext(byteBuffer -> System.out.println(toString(byteBuffer))) //
.count() //
.doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))) //
.block();
}
/**
* Uses {@code RPUSH} to store an item inside a list and {@code BRPOP} <br />
*/
@Test
public void storeToListAndPop() {
Mono<PopResult> popResult = this.connection.listCommands()
.brPop(Collections.singletonList(ByteBuffer.wrap("list".getBytes())), Duration.ofSeconds(5));
Mono<Long> llen = this.connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes()));
this.connection.listCommands() //
.rPush(ByteBuffer.wrap("list".getBytes()), Collections.singletonList(ByteBuffer.wrap("item".getBytes())))
.flatMap(l -> popResult) //
.doOnNext(result -> System.out.println(toString(result.getValue()))) //
.flatMap(result -> llen) //
.doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count))) //
.then() //
.block();
}
private void generateRandomKeys(int nrKeys) {
Flux<String> keyFlux = Flux.range(0, nrKeys).map(i -> (PREFIX + "-" + i));
Flux<SetCommand> generator = keyFlux.map(String::getBytes).map(ByteBuffer::wrap) //
.map(key -> SetCommand.set(key) //
.value(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())));
this.connection.stringCommands() //
.set(generator) //
.then() //
.block();
}
private static String toString(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return new String(bytes);
}
}

View File

@@ -0,0 +1 @@
logging.level.root=WARN