Files
spring-data-examples/mongodb/change-streams
Spring Operator ccae97890f #491 - URL Cleanup.
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 426 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-22 08:13:14 +01:00
..
2019-03-22 08:13:14 +01:00
2019-03-20 10:10:59 -05:00

Spring Data MongoDB - Change Streams Example

This project contains usage samples for consuming MongoDB 3.6 Change Streams using the imperative as well as the reactive MongoDB Java drivers.

Imperative Style

Change stream events can be consumed using a MessageListener registered within a MessageListenerContainer. The container takes care of running the task in a separate Thread pushing events to the MessageListener.

@Configuration
class Config {

	@Bean
	MessageListenerContainer messageListenerContainer(MongoTemplate template) {
		return new DefaultMessageListenerContainer(template);
	}
}

Once the MessageListenerContainer is in place MessageListeners can be registered.

MessageListener<ChangeStreamDocument<Document>, Person> messageListener = (message) -> {
	System.out.println("Hello " + message.getBody().getFirstname());
};

ChangeStreamRequest<Person> request = ChangeStreamRequest.builder()
	.collection("person")
	.filter(newAggregation(match(where("operationType").is("insert"))))
	.publishTo(messageListener)
	.build();

Subscription subscription = messageListenerContainer.register(request, Person.class);

// ...

Reactive Style

Change stream events be directly consumed via a Flux connected to the change stream.

Flux changeStream = reactiveTemplate
	.changeStream(newAggregation(match(where("operationType").is("insert"))),
				Person.class, ChangeStreamOptions.empty(), "person");
				
changeStream.doOnNext(event -> System.out.println("Hello " + event.getBody().getFirstname()))
	.subscribe();