diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml
index 2ec6d33a9..62e3223dd 100644
--- a/spring-cloud-function-context/pom.xml
+++ b/spring-cloud-function-context/pom.xml
@@ -53,6 +53,11 @@
spring-cloud-function-compiler
test
+
+ io.projectreactor
+ reactor-test
+ test
+
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java
index 84e8bd38e..94053c971 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java
@@ -362,6 +362,12 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
FunctionType bType = bReg.getType();
Object a = aReg.getTarget();
Object b = bReg.getTarget();
+ if (aType != null && bType != null) {
+ if (aType.isMessage() && !bType.isMessage()) {
+ bType = bType.message();
+ b = message(b);
+ }
+ }
Object composedFunction = null;
if (a instanceof Supplier && b instanceof Function) {
Supplier> supplier = (Supplier>) a;
@@ -420,6 +426,19 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
.type(FunctionType.compose(aType, bType));
}
+ private Object message(Object input) {
+ if (input instanceof Supplier) {
+ return new MessageSupplier((Supplier>) input);
+ }
+ if (input instanceof Consumer) {
+ return new MessageConsumer((Consumer>) input);
+ }
+ if (input instanceof Function) {
+ return new MessageFunction((Function, ?>) input);
+ }
+ return input;
+ }
+
@SuppressWarnings("unchecked")
private T doLookup(Class> type, String name) {
T function = null;
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java
new file mode 100644
index 000000000..5cd494346
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.function.Consumer;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+
+import org.springframework.messaging.Message;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageConsumer implements Consumer>> {
+
+ private Consumer delegate;
+
+ @SuppressWarnings("unchecked")
+ public MessageConsumer(Consumer> input) {
+ this.delegate = (Consumer) input;
+ }
+
+ @Override
+ public void accept(Publisher> input) {
+ Flux.from(input).map(Message::getPayload).subscribe(this.delegate::accept);
+ }
+
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java
new file mode 100644
index 000000000..95b42bba4
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.cloud.function.core.FluxConsumer;
+import org.springframework.cloud.function.core.FluxFunction;
+import org.springframework.cloud.function.core.FluxToMonoFunction;
+import org.springframework.cloud.function.core.MonoToFluxFunction;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageFunction
+ implements Function>, Publisher>> {
+
+ private Function, ?> delegate;
+
+ public MessageFunction(Function, ?> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Publisher> apply(Publisher> input) {
+ Flux> flux = Flux.from(input);
+ if (this.delegate instanceof FluxFunction) {
+ @SuppressWarnings("unchecked")
+ Function target = (Function) ((FluxFunction, ?>) this.delegate)
+ .getTarget();
+ return flux.map(
+ value -> MessageBuilder.withPayload(target.apply(value.getPayload()))
+ .copyHeaders(value.getHeaders()).build());
+ }
+ if (this.delegate instanceof MonoToFluxFunction) {
+ @SuppressWarnings("unchecked")
+ Function, Flux> target = ((MonoToFluxFunction) this.delegate)
+ .getTarget();
+ return flux.next()
+ .flatMapMany(value -> target.apply(Mono.just(value.getPayload()))
+ .map(object -> MessageBuilder.withPayload(object)
+ .copyHeaders(value.getHeaders()).build()));
+ }
+ if (this.delegate instanceof FluxToMonoFunction) {
+ @SuppressWarnings("unchecked")
+ Function, Mono> target = ((FluxToMonoFunction) this.delegate)
+ .getTarget();
+ AtomicReference headers = new AtomicReference<>();
+ return target.apply(flux.map(messsage -> {
+ headers.set(messsage.getHeaders());
+ return messsage.getPayload();
+ })).map(payload -> MessageBuilder.withPayload(payload)
+ .copyHeaders(headers.get()).build());
+ }
+ if (this.delegate instanceof FluxConsumer) {
+ @SuppressWarnings("unchecked")
+ FluxConsumer target = ((FluxConsumer) this.delegate);
+ AtomicReference headers = new AtomicReference<>();
+ Mono mapped = target.apply(flux.map(messsage -> {
+ headers.set(messsage.getHeaders());
+ return messsage.getPayload();
+ }));
+ return mapped.map(value -> MessageBuilder.createMessage(null, headers.get()));
+ }
+ // TODO: cover the case that delegate is actually Function
+ @SuppressWarnings("unchecked")
+ Function function = (Function) this.delegate;
+ return flux.map(
+ value -> MessageBuilder.withPayload(function.apply(value.getPayload()))
+ .copyHeaders(value.getHeaders()).build());
+ }
+
+}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java
new file mode 100644
index 000000000..e658d998c
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.function.Supplier;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.cloud.function.core.FluxSupplier;
+import org.springframework.cloud.function.core.MonoSupplier;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageSupplier implements Supplier>> {
+
+ private Supplier> delegate;
+
+ public MessageSupplier(Supplier> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Publisher> get() {
+ if (this.delegate instanceof FluxSupplier) {
+ return ((Flux>) this.delegate.get())
+ .map(value -> MessageBuilder.withPayload(value).build());
+ }
+ if (this.delegate instanceof MonoSupplier) {
+ return ((Mono>) this.delegate.get())
+ .map(value -> MessageBuilder.withPayload(value).build());
+ }
+ Object product = this.delegate.get();
+ if (product instanceof Publisher) {
+ return Flux.from((Publisher>) product)
+ .map(value -> MessageBuilder.withPayload(value).build());
+ }
+ if (product instanceof Iterable) {
+ return Flux.fromIterable((Iterable>) product)
+ .map(value -> MessageBuilder.withPayload(value).build());
+ }
+ return Mono.just(MessageBuilder.withPayload(product).build());
+ }
+
+}
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java
index 1cc47c927..34922ffe7 100644
--- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java
@@ -18,7 +18,6 @@ package org.springframework.cloud.function.context.catalog;
import java.util.function.Function;
-import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
@@ -111,12 +110,10 @@ public class InMemoryFunctionCatalogTests {
}
@Test
- @Ignore
public void testFunctionCompositionMixedMessages() {
FunctionRegistration upperCaseRegistration = new FunctionRegistration<>(
new UpperCaseMessage(), "uppercase")
.type(FunctionType.of(UpperCaseMessage.class).getType());
- // TODO: make this work with plain Reverse (not message)
FunctionRegistration reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType());
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
@@ -128,9 +125,11 @@ public class InMemoryFunctionCatalogTests {
assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isTrue();
assertThat(lookedUpFunction).isNotNull();
- assertThat(lookedUpFunction
- .apply(Flux.just(MessageBuilder.withPayload("star").build())).blockFirst()
- .getPayload()).isEqualTo("RATS");
+ Message message = lookedUpFunction.apply(Flux
+ .just(MessageBuilder.withPayload("star").setHeader("foo", "bar").build()))
+ .blockFirst();
+ assertThat(message.getPayload()).isEqualTo("RATS");
+ assertThat(message.getHeaders().get("foo")).isEqualTo("bar");
}
private static class UpperCase implements Function {
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java
new file mode 100644
index 000000000..49fff5857
--- /dev/null
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+
+import org.springframework.messaging.support.MessageBuilder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageConsumerTests {
+
+ private List items = new ArrayList<>();
+
+ @Test
+ public void plainConsumer() {
+ MessageConsumer consumer = new MessageConsumer(input());
+ consumer.accept(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ assertThat(this.items).hasSize(1);
+ }
+
+ private Consumer input() {
+ return value -> this.items.add(value);
+ }
+
+}
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java
new file mode 100644
index 000000000..1418c070e
--- /dev/null
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import org.springframework.cloud.function.core.FluxConsumer;
+import org.springframework.cloud.function.core.FluxFunction;
+import org.springframework.cloud.function.core.FluxToMonoFunction;
+import org.springframework.cloud.function.core.MonoToFluxFunction;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageFunctionTests {
+
+ private List items = new ArrayList<>();
+
+ @Test
+ public void plainFunction() {
+ MessageFunction function = new MessageFunction(uppercase());
+ Publisher> result = function.apply(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ StepVerifier.create(result).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("FOO");
+ assertThat(message.getHeaders()).containsEntry("foo", "bar");
+ });
+ }
+
+ @Test
+ public void fluxFunction() {
+ MessageFunction function = new MessageFunction(new FluxFunction<>(uppercase()));
+ Publisher> result = function.apply(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ StepVerifier.create(result).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("FOO");
+ assertThat(message.getHeaders()).containsEntry("foo", "bar");
+ });
+ }
+
+ @Test
+ public void fluxToMonoFunction() {
+ MessageFunction function = new MessageFunction(
+ new FluxToMonoFunction<>(flux -> flux.next().map(uppercase())));
+ Publisher> result = function.apply(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ StepVerifier.create(result).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("FOO");
+ assertThat(message.getHeaders()).containsEntry("foo", "bar");
+ });
+ }
+
+ @Test
+ public void monoToFunction() {
+ MessageFunction function = new MessageFunction(
+ new MonoToFluxFunction<>(mono -> Flux.from(mono.map(uppercase()))));
+ Publisher> result = function.apply(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ StepVerifier.create(result).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("FOO");
+ assertThat(message.getHeaders()).containsEntry("foo", "bar");
+ });
+ }
+
+ @Test
+ public void fluxConsumer() {
+ MessageFunction function = new MessageFunction(new FluxConsumer<>(stash()));
+ Publisher> result = function.apply(Flux
+ .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build()));
+ StepVerifier.create(result).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo(null);
+ assertThat(message.getHeaders()).containsEntry("foo", "bar");
+ assertThat(this.items).hasSize(1);
+ });
+ }
+
+ private Consumer stash() {
+ return value -> this.items.add(value);
+ }
+
+ private Function uppercase() {
+ return value -> value.toUpperCase();
+ }
+
+}
diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java
new file mode 100644
index 000000000..ccb7ac779
--- /dev/null
+++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.catalog;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Dave Syer
+ */
+public class MessageSupplierTests {
+
+ @Test
+ public void plainSupplier() {
+ MessageSupplier supplier = new MessageSupplier(input());
+ StepVerifier.create(supplier.get()).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("foo");
+ assertThat(message.getHeaders()).isEmpty();
+ });
+ }
+
+ @Test
+ public void collectionSupplier() {
+ MessageSupplier supplier = new MessageSupplier(inputs());
+ StepVerifier.create(supplier.get()).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("foo");
+ assertThat(message.getHeaders()).isEmpty();
+ }).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("bar");
+ assertThat(message.getHeaders()).isEmpty();
+ });
+ }
+
+ @Test
+ public void fluxSupplier() {
+ MessageSupplier supplier = new MessageSupplier(flux());
+ StepVerifier.create(supplier.get()).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("foo");
+ assertThat(message.getHeaders()).isEmpty();
+ }).assertNext(message -> {
+ assertThat(message.getPayload()).isEqualTo("bar");
+ assertThat(message.getHeaders()).isEmpty();
+ });
+ }
+
+ private Supplier input() {
+ return () -> "foo";
+ }
+
+ private Supplier> inputs() {
+ return () -> Arrays.asList("foo", "bar");
+ }
+
+ private Supplier> flux() {
+ return () -> Flux.just("foo", "bar");
+ }
+
+}