GH-143: Add an update option to MongoDB Supplier (#147)
* GH-143: Add an update option to MongoDB Supplier Fixes https://github.com/spring-cloud/stream-applications/issues/143 * * Fix Checkstyle violations * Update `README.adoc` for `mongodb-source`
This commit is contained in:
@@ -23,7 +23,10 @@ In the case of splitting you get a `Flux` which you have to subscribe in your ap
|
||||
|
||||
All configuration properties are prefixed with `mongodb.supplier`.
|
||||
|
||||
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java[MongoDBSupplierProperties]
|
||||
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java[MongoDBSupplierProperties].
|
||||
|
||||
The `queryExpression` and `updateExpression` options may use Spring Data MongoDB query DSL from the `org.springframework.data.mongodb.core.query`, such as `Query` and `Update` factories respectively.
|
||||
The `updateExpression` is optional and ca use an item from query result as a root evaluation object to extract some values to update from just fetched data.
|
||||
|
||||
## Tests
|
||||
|
||||
|
||||
@@ -32,6 +32,12 @@
|
||||
<version>${spring-cloud-function.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud.fn</groupId>
|
||||
<artifactId>config-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud.fn</groupId>
|
||||
<artifactId>splitter-function</artifactId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2020 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -45,7 +45,7 @@ import org.springframework.messaging.Message;
|
||||
* @author David Turanski
|
||||
*/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties({MongodbSupplierProperties.class})
|
||||
@EnableConfigurationProperties({ MongodbSupplierProperties.class })
|
||||
@Import(SplitterFunctionConfiguration.class)
|
||||
public class MongodbSupplierConfiguration {
|
||||
|
||||
@@ -82,11 +82,10 @@ public class MongodbSupplierConfiguration {
|
||||
/**
|
||||
* The inheritors can consider to override this method for their purpose or just adjust
|
||||
* options for the returned instance.
|
||||
*
|
||||
* @return a {@link MongoDbMessageSource} instance
|
||||
*/
|
||||
@Bean
|
||||
public MongoDbMessageSource mongoSource() {
|
||||
public MongoDbMessageSource mongoDbSource() {
|
||||
Expression queryExpression = (this.properties.getQueryExpression() != null
|
||||
? this.properties.getQueryExpression()
|
||||
: new LiteralExpression(this.properties.getQuery()));
|
||||
@@ -96,4 +95,10 @@ public class MongodbSupplierConfiguration {
|
||||
return mongoDbMessageSource;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public UpdatingMongoDbMessageSource mongoSource() {
|
||||
return new UpdatingMongoDbMessageSource(mongoDbSource(), this.mongoTemplate,
|
||||
this.properties.getUpdateExpression());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2019 the original author or authors.
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -48,6 +48,11 @@ public class MongodbSupplierProperties {
|
||||
*/
|
||||
private Expression queryExpression;
|
||||
|
||||
/**
|
||||
* The SpEL expression in MongoDB update DSL style.
|
||||
*/
|
||||
private Expression updateExpression;
|
||||
|
||||
/**
|
||||
* Whether to split the query result as individual messages.
|
||||
*/
|
||||
@@ -87,4 +92,12 @@ public class MongodbSupplierProperties {
|
||||
this.split = split;
|
||||
}
|
||||
|
||||
public Expression getUpdateExpression() {
|
||||
return this.updateExpression;
|
||||
}
|
||||
|
||||
public void setUpdateExpression(Expression updateExpression) {
|
||||
this.updateExpression = updateExpression;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.supplier.mongo;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.query.BasicQuery;
|
||||
import org.springframework.data.mongodb.core.query.BasicUpdate;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
import org.springframework.expression.EvaluationContext;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.TypeLocator;
|
||||
import org.springframework.expression.spel.support.StandardTypeLocator;
|
||||
import org.springframework.integration.endpoint.AbstractMessageSource;
|
||||
import org.springframework.integration.expression.ExpressionUtils;
|
||||
import org.springframework.integration.mongodb.inbound.MongoDbMessageSource;
|
||||
import org.springframework.integration.mongodb.support.MongoHeaders;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* An {@link AbstractMessageSource} extension for MongoDB updates
|
||||
* based on the query result from the {@link MongoDbMessageSource} delegate.
|
||||
*
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
class UpdatingMongoDbMessageSource extends AbstractMessageSource<Object> {
|
||||
|
||||
private final MongoDbMessageSource delegate;
|
||||
|
||||
private final MongoTemplate mongoTemplate;
|
||||
|
||||
@Nullable
|
||||
private final Expression updateExpression;
|
||||
|
||||
private EvaluationContext evaluationContext;
|
||||
|
||||
|
||||
UpdatingMongoDbMessageSource(MongoDbMessageSource delegate, MongoTemplate mongoTemplate,
|
||||
@Nullable Expression updateExpression) {
|
||||
|
||||
this.delegate = delegate;
|
||||
this.mongoTemplate = mongoTemplate;
|
||||
this.updateExpression = updateExpression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComponentType() {
|
||||
return "mongo:updating-inbound-channel-adapter";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onInit() {
|
||||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
|
||||
TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
|
||||
if (typeLocator instanceof StandardTypeLocator) {
|
||||
//Register MongoDB query API package so FQCN can be avoided in query-expression.
|
||||
((StandardTypeLocator) typeLocator).registerImport("org.springframework.data.mongodb.core.query");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object doReceive() {
|
||||
final Message<Object> message = this.delegate.receive();
|
||||
if (message != null && this.updateExpression != null) {
|
||||
String collectionName = message.getHeaders().get(MongoHeaders.COLLECTION_NAME, String.class);
|
||||
Object payload = message.getPayload();
|
||||
List<?> dataToUpdate;
|
||||
if (payload instanceof List) {
|
||||
dataToUpdate = (List<?>) payload;
|
||||
}
|
||||
else {
|
||||
dataToUpdate = Collections.singletonList(payload);
|
||||
}
|
||||
|
||||
for (Object data : dataToUpdate) {
|
||||
Query query = new BasicQuery((String) data);
|
||||
|
||||
Object value = this.updateExpression.getValue(this.evaluationContext, data);
|
||||
Assert.notNull(value, "'updateExpression' must not evaluate to null");
|
||||
Update update;
|
||||
if (value instanceof String) {
|
||||
update = new BasicUpdate((String) value);
|
||||
}
|
||||
else if (value instanceof Update) {
|
||||
update = ((Update) value);
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("'updateExpression' must evaluate to String " +
|
||||
"or org.springframework.data.mongodb.core.query.Update");
|
||||
}
|
||||
|
||||
this.mongoTemplate.updateFirst(query, update, collectionName);
|
||||
}
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2020 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -40,7 +40,10 @@ import static org.assertj.core.api.Assertions.entry;
|
||||
|
||||
@SpringBootTest(properties = {
|
||||
"spring.data.mongodb.port=0",
|
||||
"mongodb.supplier.collection=testing"})
|
||||
"mongodb.supplier.collection=testing",
|
||||
"mongodb.supplier.query={ name: { $exists: true }}",
|
||||
"mongodb.supplier.update-expression='{ $unset: { name: 0 } }'"
|
||||
})
|
||||
class MongodbSupplierApplicationTests {
|
||||
|
||||
private ObjectMapper objectMapper = new ObjectMapper();
|
||||
@@ -78,6 +81,8 @@ class MongodbSupplierApplicationTests {
|
||||
entry("name", "bar")))
|
||||
.thenCancel()
|
||||
.verify();
|
||||
|
||||
assertThat(this.mongodbSupplier.get().collectList().block()).isEmpty();
|
||||
}
|
||||
|
||||
private Map<String, Object> payload(Message<?> message) {
|
||||
|
||||
Reference in New Issue
Block a user