From 32987230c19595dacaddecfc6f46cac3b76afca9 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Mon, 25 Feb 2019 13:34:45 +0000 Subject: [PATCH] Add support for composing function of Message with plain function Fixes gh-267 at least for the most common cases. --- spring-cloud-function-context/pom.xml | 5 + .../AbstractComposableFunctionRegistry.java | 19 +++ .../context/catalog/MessageConsumer.java | 43 +++++++ .../context/catalog/MessageFunction.java | 95 +++++++++++++++ .../context/catalog/MessageSupplier.java | 63 ++++++++++ .../catalog/InMemoryFunctionCatalogTests.java | 11 +- .../context/catalog/MessageConsumerTests.java | 49 ++++++++ .../context/catalog/MessageFunctionTests.java | 111 ++++++++++++++++++ .../context/catalog/MessageSupplierTests.java | 79 +++++++++++++ 9 files changed, 469 insertions(+), 6 deletions(-) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java create mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java create mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java create mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index 2ec6d33a9..62e3223dd 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -53,6 +53,11 @@ spring-cloud-function-compiler test + + io.projectreactor + reactor-test + test + diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index 84e8bd38e..94053c971 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -362,6 +362,12 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi FunctionType bType = bReg.getType(); Object a = aReg.getTarget(); Object b = bReg.getTarget(); + if (aType != null && bType != null) { + if (aType.isMessage() && !bType.isMessage()) { + bType = bType.message(); + b = message(b); + } + } Object composedFunction = null; if (a instanceof Supplier && b instanceof Function) { Supplier> supplier = (Supplier>) a; @@ -420,6 +426,19 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi .type(FunctionType.compose(aType, bType)); } + private Object message(Object input) { + if (input instanceof Supplier) { + return new MessageSupplier((Supplier) input); + } + if (input instanceof Consumer) { + return new MessageConsumer((Consumer) input); + } + if (input instanceof Function) { + return new MessageFunction((Function) input); + } + return input; + } + @SuppressWarnings("unchecked") private T doLookup(Class type, String name) { T function = null; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java new file mode 100644 index 000000000..5cd494346 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.function.Consumer; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.messaging.Message; + +/** + * @author Dave Syer + */ +public class MessageConsumer implements Consumer>> { + + private Consumer delegate; + + @SuppressWarnings("unchecked") + public MessageConsumer(Consumer input) { + this.delegate = (Consumer) input; + } + + @Override + public void accept(Publisher> input) { + Flux.from(input).map(Message::getPayload).subscribe(this.delegate::accept); + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java new file mode 100644 index 000000000..95b42bba4 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java @@ -0,0 +1,95 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.function.core.FluxConsumer; +import org.springframework.cloud.function.core.FluxFunction; +import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.MonoToFluxFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Dave Syer + */ +public class MessageFunction + implements Function>, Publisher>> { + + private Function delegate; + + public MessageFunction(Function delegate) { + this.delegate = delegate; + } + + @Override + public Publisher> apply(Publisher> input) { + Flux> flux = Flux.from(input); + if (this.delegate instanceof FluxFunction) { + @SuppressWarnings("unchecked") + Function target = (Function) ((FluxFunction) this.delegate) + .getTarget(); + return flux.map( + value -> MessageBuilder.withPayload(target.apply(value.getPayload())) + .copyHeaders(value.getHeaders()).build()); + } + if (this.delegate instanceof MonoToFluxFunction) { + @SuppressWarnings("unchecked") + Function, Flux> target = ((MonoToFluxFunction) this.delegate) + .getTarget(); + return flux.next() + .flatMapMany(value -> target.apply(Mono.just(value.getPayload())) + .map(object -> MessageBuilder.withPayload(object) + .copyHeaders(value.getHeaders()).build())); + } + if (this.delegate instanceof FluxToMonoFunction) { + @SuppressWarnings("unchecked") + Function, Mono> target = ((FluxToMonoFunction) this.delegate) + .getTarget(); + AtomicReference headers = new AtomicReference<>(); + return target.apply(flux.map(messsage -> { + headers.set(messsage.getHeaders()); + return messsage.getPayload(); + })).map(payload -> MessageBuilder.withPayload(payload) + .copyHeaders(headers.get()).build()); + } + if (this.delegate instanceof FluxConsumer) { + @SuppressWarnings("unchecked") + FluxConsumer target = ((FluxConsumer) this.delegate); + AtomicReference headers = new AtomicReference<>(); + Mono mapped = target.apply(flux.map(messsage -> { + headers.set(messsage.getHeaders()); + return messsage.getPayload(); + })); + return mapped.map(value -> MessageBuilder.createMessage(null, headers.get())); + } + // TODO: cover the case that delegate is actually Function + @SuppressWarnings("unchecked") + Function function = (Function) this.delegate; + return flux.map( + value -> MessageBuilder.withPayload(function.apply(value.getPayload())) + .copyHeaders(value.getHeaders()).build()); + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java new file mode 100644 index 000000000..e658d998c --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.function.core.FluxSupplier; +import org.springframework.cloud.function.core.MonoSupplier; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Dave Syer + */ +public class MessageSupplier implements Supplier>> { + + private Supplier delegate; + + public MessageSupplier(Supplier delegate) { + this.delegate = delegate; + } + + @Override + public Publisher> get() { + if (this.delegate instanceof FluxSupplier) { + return ((Flux) this.delegate.get()) + .map(value -> MessageBuilder.withPayload(value).build()); + } + if (this.delegate instanceof MonoSupplier) { + return ((Mono) this.delegate.get()) + .map(value -> MessageBuilder.withPayload(value).build()); + } + Object product = this.delegate.get(); + if (product instanceof Publisher) { + return Flux.from((Publisher) product) + .map(value -> MessageBuilder.withPayload(value).build()); + } + if (product instanceof Iterable) { + return Flux.fromIterable((Iterable) product) + .map(value -> MessageBuilder.withPayload(value).build()); + } + return Mono.just(MessageBuilder.withPayload(product).build()); + } + +} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java index 1cc47c927..34922ffe7 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java @@ -18,7 +18,6 @@ package org.springframework.cloud.function.context.catalog; import java.util.function.Function; -import org.junit.Ignore; import org.junit.Test; import reactor.core.publisher.Flux; @@ -111,12 +110,10 @@ public class InMemoryFunctionCatalogTests { } @Test - @Ignore public void testFunctionCompositionMixedMessages() { FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( new UpperCaseMessage(), "uppercase") .type(FunctionType.of(UpperCaseMessage.class).getType()); - // TODO: make this work with plain Reverse (not message) FunctionRegistration reverseRegistration = new FunctionRegistration<>( new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType()); InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); @@ -128,9 +125,11 @@ public class InMemoryFunctionCatalogTests { assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isTrue(); assertThat(lookedUpFunction).isNotNull(); - assertThat(lookedUpFunction - .apply(Flux.just(MessageBuilder.withPayload("star").build())).blockFirst() - .getPayload()).isEqualTo("RATS"); + Message message = lookedUpFunction.apply(Flux + .just(MessageBuilder.withPayload("star").setHeader("foo", "bar").build())) + .blockFirst(); + assertThat(message.getPayload()).isEqualTo("RATS"); + assertThat(message.getHeaders().get("foo")).isEqualTo("bar"); } private static class UpperCase implements Function { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java new file mode 100644 index 000000000..49fff5857 --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import org.junit.Test; +import reactor.core.publisher.Flux; + +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + */ +public class MessageConsumerTests { + + private List items = new ArrayList<>(); + + @Test + public void plainConsumer() { + MessageConsumer consumer = new MessageConsumer(input()); + consumer.accept(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + assertThat(this.items).hasSize(1); + } + + private Consumer input() { + return value -> this.items.add(value); + } + +} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java new file mode 100644 index 000000000..1418c070e --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java @@ -0,0 +1,111 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.cloud.function.core.FluxConsumer; +import org.springframework.cloud.function.core.FluxFunction; +import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.MonoToFluxFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + */ +public class MessageFunctionTests { + + private List items = new ArrayList<>(); + + @Test + public void plainFunction() { + MessageFunction function = new MessageFunction(uppercase()); + Publisher> result = function.apply(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + StepVerifier.create(result).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("FOO"); + assertThat(message.getHeaders()).containsEntry("foo", "bar"); + }); + } + + @Test + public void fluxFunction() { + MessageFunction function = new MessageFunction(new FluxFunction<>(uppercase())); + Publisher> result = function.apply(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + StepVerifier.create(result).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("FOO"); + assertThat(message.getHeaders()).containsEntry("foo", "bar"); + }); + } + + @Test + public void fluxToMonoFunction() { + MessageFunction function = new MessageFunction( + new FluxToMonoFunction<>(flux -> flux.next().map(uppercase()))); + Publisher> result = function.apply(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + StepVerifier.create(result).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("FOO"); + assertThat(message.getHeaders()).containsEntry("foo", "bar"); + }); + } + + @Test + public void monoToFunction() { + MessageFunction function = new MessageFunction( + new MonoToFluxFunction<>(mono -> Flux.from(mono.map(uppercase())))); + Publisher> result = function.apply(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + StepVerifier.create(result).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("FOO"); + assertThat(message.getHeaders()).containsEntry("foo", "bar"); + }); + } + + @Test + public void fluxConsumer() { + MessageFunction function = new MessageFunction(new FluxConsumer<>(stash())); + Publisher> result = function.apply(Flux + .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); + StepVerifier.create(result).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo(null); + assertThat(message.getHeaders()).containsEntry("foo", "bar"); + assertThat(this.items).hasSize(1); + }); + } + + private Consumer stash() { + return value -> this.items.add(value); + } + + private Function uppercase() { + return value -> value.toUpperCase(); + } + +} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java new file mode 100644 index 000000000..ccb7ac779 --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.Arrays; +import java.util.Collection; +import java.util.function.Supplier; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + */ +public class MessageSupplierTests { + + @Test + public void plainSupplier() { + MessageSupplier supplier = new MessageSupplier(input()); + StepVerifier.create(supplier.get()).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getHeaders()).isEmpty(); + }); + } + + @Test + public void collectionSupplier() { + MessageSupplier supplier = new MessageSupplier(inputs()); + StepVerifier.create(supplier.get()).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getHeaders()).isEmpty(); + }).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("bar"); + assertThat(message.getHeaders()).isEmpty(); + }); + } + + @Test + public void fluxSupplier() { + MessageSupplier supplier = new MessageSupplier(flux()); + StepVerifier.create(supplier.get()).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("foo"); + assertThat(message.getHeaders()).isEmpty(); + }).assertNext(message -> { + assertThat(message.getPayload()).isEqualTo("bar"); + assertThat(message.getHeaders()).isEmpty(); + }); + } + + private Supplier input() { + return () -> "foo"; + } + + private Supplier> inputs() { + return () -> Arrays.asList("foo", "bar"); + } + + private Supplier> flux() { + return () -> Flux.just("foo", "bar"); + } + +}