diff --git a/applications/source/mongodb-source/README.adoc b/applications/source/mongodb-source/README.adoc index dddfc9b4..92633ae2 100644 --- a/applications/source/mongodb-source/README.adoc +++ b/applications/source/mongodb-source/README.adoc @@ -17,6 +17,7 @@ $$mongodb.supplier.collection$$:: $$The MongoDB collection to query.$$ *($$Strin $$mongodb.supplier.query$$:: $$The MongoDB query.$$ *($$String$$, default: `$${ }$$`)* $$mongodb.supplier.query-expression$$:: $$The SpEL expression in MongoDB query DSL style.$$ *($$Expression$$, default: `$$$$`)* $$mongodb.supplier.split$$:: $$Whether to split the query result as individual messages.$$ *($$Boolean$$, default: `$$true$$`)* +$$mongodb.supplier.update-expression$$:: $$The SpEL expression in MongoDB update DSL style.$$ *($$Expression$$, default: `$$$$`)* //end::configuration-properties[] Also see the https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html[Spring Boot Documentation] for additional `MongoProperties` properties. diff --git a/functions/supplier/mongodb-supplier/README.adoc b/functions/supplier/mongodb-supplier/README.adoc index 066dc562..4fb5ba42 100644 --- a/functions/supplier/mongodb-supplier/README.adoc +++ b/functions/supplier/mongodb-supplier/README.adoc @@ -23,7 +23,10 @@ In the case of splitting you get a `Flux` which you have to subscribe in your ap All configuration properties are prefixed with `mongodb.supplier`. -For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java[MongoDBSupplierProperties] +For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java[MongoDBSupplierProperties]. + +The `queryExpression` and `updateExpression` options may use Spring Data MongoDB query DSL from the `org.springframework.data.mongodb.core.query`, such as `Query` and `Update` factories respectively. +The `updateExpression` is optional and ca use an item from query result as a root evaluation object to extract some values to update from just fetched data. ## Tests diff --git a/functions/supplier/mongodb-supplier/pom.xml b/functions/supplier/mongodb-supplier/pom.xml index 52ca71b8..38c27e97 100644 --- a/functions/supplier/mongodb-supplier/pom.xml +++ b/functions/supplier/mongodb-supplier/pom.xml @@ -32,6 +32,12 @@ ${spring-cloud-function.version} + + org.springframework.cloud.fn + config-common + ${project.version} + + org.springframework.cloud.fn splitter-function diff --git a/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java index 80c38c7b..0d3f6f37 100644 --- a/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java +++ b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ import org.springframework.messaging.Message; * @author David Turanski */ @Configuration -@EnableConfigurationProperties({MongodbSupplierProperties.class}) +@EnableConfigurationProperties({ MongodbSupplierProperties.class }) @Import(SplitterFunctionConfiguration.class) public class MongodbSupplierConfiguration { @@ -82,11 +82,10 @@ public class MongodbSupplierConfiguration { /** * The inheritors can consider to override this method for their purpose or just adjust * options for the returned instance. - * * @return a {@link MongoDbMessageSource} instance */ @Bean - public MongoDbMessageSource mongoSource() { + public MongoDbMessageSource mongoDbSource() { Expression queryExpression = (this.properties.getQueryExpression() != null ? this.properties.getQueryExpression() : new LiteralExpression(this.properties.getQuery())); @@ -96,4 +95,10 @@ public class MongodbSupplierConfiguration { return mongoDbMessageSource; } + @Bean + public UpdatingMongoDbMessageSource mongoSource() { + return new UpdatingMongoDbMessageSource(mongoDbSource(), this.mongoTemplate, + this.properties.getUpdateExpression()); + } + } diff --git a/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java index 1d993ab7..21de5b3c 100644 --- a/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java +++ b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,11 @@ public class MongodbSupplierProperties { */ private Expression queryExpression; + /** + * The SpEL expression in MongoDB update DSL style. + */ + private Expression updateExpression; + /** * Whether to split the query result as individual messages. */ @@ -87,4 +92,12 @@ public class MongodbSupplierProperties { this.split = split; } + public Expression getUpdateExpression() { + return this.updateExpression; + } + + public void setUpdateExpression(Expression updateExpression) { + this.updateExpression = updateExpression; + } + } diff --git a/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/UpdatingMongoDbMessageSource.java b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/UpdatingMongoDbMessageSource.java new file mode 100644 index 00000000..7f805679 --- /dev/null +++ b/functions/supplier/mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/UpdatingMongoDbMessageSource.java @@ -0,0 +1,118 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.supplier.mongo; + +import java.util.Collections; +import java.util.List; + +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.BasicQuery; +import org.springframework.data.mongodb.core.query.BasicUpdate; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.TypeLocator; +import org.springframework.expression.spel.support.StandardTypeLocator; +import org.springframework.integration.endpoint.AbstractMessageSource; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.mongodb.inbound.MongoDbMessageSource; +import org.springframework.integration.mongodb.support.MongoHeaders; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * An {@link AbstractMessageSource} extension for MongoDB updates + * based on the query result from the {@link MongoDbMessageSource} delegate. + * + * @author Artem Bilan + */ +class UpdatingMongoDbMessageSource extends AbstractMessageSource { + + private final MongoDbMessageSource delegate; + + private final MongoTemplate mongoTemplate; + + @Nullable + private final Expression updateExpression; + + private EvaluationContext evaluationContext; + + + UpdatingMongoDbMessageSource(MongoDbMessageSource delegate, MongoTemplate mongoTemplate, + @Nullable Expression updateExpression) { + + this.delegate = delegate; + this.mongoTemplate = mongoTemplate; + this.updateExpression = updateExpression; + } + + @Override + public String getComponentType() { + return "mongo:updating-inbound-channel-adapter"; + } + + @Override + protected void onInit() { + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + TypeLocator typeLocator = this.evaluationContext.getTypeLocator(); + if (typeLocator instanceof StandardTypeLocator) { + //Register MongoDB query API package so FQCN can be avoided in query-expression. + ((StandardTypeLocator) typeLocator).registerImport("org.springframework.data.mongodb.core.query"); + } + } + + @Override + protected Object doReceive() { + final Message message = this.delegate.receive(); + if (message != null && this.updateExpression != null) { + String collectionName = message.getHeaders().get(MongoHeaders.COLLECTION_NAME, String.class); + Object payload = message.getPayload(); + List dataToUpdate; + if (payload instanceof List) { + dataToUpdate = (List) payload; + } + else { + dataToUpdate = Collections.singletonList(payload); + } + + for (Object data : dataToUpdate) { + Query query = new BasicQuery((String) data); + + Object value = this.updateExpression.getValue(this.evaluationContext, data); + Assert.notNull(value, "'updateExpression' must not evaluate to null"); + Update update; + if (value instanceof String) { + update = new BasicUpdate((String) value); + } + else if (value instanceof Update) { + update = ((Update) value); + } + else { + throw new IllegalStateException("'updateExpression' must evaluate to String " + + "or org.springframework.data.mongodb.core.query.Update"); + } + + this.mongoTemplate.updateFirst(query, update, collectionName); + } + } + + return message; + } + +} diff --git a/functions/supplier/mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java b/functions/supplier/mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java index c1d4fcc8..366b08ef 100644 --- a/functions/supplier/mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java +++ b/functions/supplier/mongodb-supplier/src/test/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,10 @@ import static org.assertj.core.api.Assertions.entry; @SpringBootTest(properties = { "spring.data.mongodb.port=0", - "mongodb.supplier.collection=testing"}) + "mongodb.supplier.collection=testing", + "mongodb.supplier.query={ name: { $exists: true }}", + "mongodb.supplier.update-expression='{ $unset: { name: 0 } }'" +}) class MongodbSupplierApplicationTests { private ObjectMapper objectMapper = new ObjectMapper(); @@ -78,6 +81,8 @@ class MongodbSupplierApplicationTests { entry("name", "bar"))) .thenCancel() .verify(); + + assertThat(this.mongodbSupplier.get().collectList().block()).isEmpty(); } private Map payload(Message message) {