diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index 2d96c65ef..af264fe2b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -41,7 +41,9 @@ import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.function.registry.FunctionCatalog; +import org.springframework.cloud.function.support.FluxConsumer; import org.springframework.cloud.function.support.FluxFunction; +import org.springframework.cloud.function.support.FluxSupplier; import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -113,6 +115,11 @@ public class ContextFunctionCatalogAutoConfiguration { Supplier> supplier = (Supplier>) target; return wrapSupplier(supplier, mapper, key); } + else if (!isFluxSupplier(key, target)) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + FluxSupplier value = new FluxSupplier(target); + return value; + } else { return target; } @@ -167,6 +174,11 @@ public class ContextFunctionCatalogAutoConfiguration { Consumer> consumer = (Consumer>) target; return wrapConsumer(consumer, mapper, key); } + else if (!isFluxConsumer(key, target)) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + FluxConsumer value = new FluxConsumer(target); + return value; + } else { return target; } @@ -211,6 +223,48 @@ public class ContextFunctionCatalogAutoConfiguration { return FunctionUtils.isFluxFunction(function); } + private boolean isFluxConsumer(String name, Consumer function) { + if (this.registry.containsBeanDefinition(name)) { + BeanDefinition beanDefinition = this.registry.getBeanDefinition(name); + Object source = beanDefinition.getSource(); + if (source instanceof StandardMethodMetadata) { + StandardMethodMetadata metadata = (StandardMethodMetadata) source; + Type returnType = metadata.getIntrospectedMethod() + .getGenericReturnType(); + if (returnType instanceof ParameterizedType) { + Type[] types = ((ParameterizedType) returnType) + .getActualTypeArguments(); + if (types != null && types.length == 1) { + return (types[0].getTypeName() + .startsWith(Flux.class.getName())); + } + } + } + } + return FunctionUtils.isFluxConsumer(function); + } + + private boolean isFluxSupplier(String name, Supplier function) { + if (this.registry.containsBeanDefinition(name)) { + BeanDefinition beanDefinition = this.registry.getBeanDefinition(name); + Object source = beanDefinition.getSource(); + if (source instanceof StandardMethodMetadata) { + StandardMethodMetadata metadata = (StandardMethodMetadata) source; + Type returnType = metadata.getIntrospectedMethod() + .getGenericReturnType(); + if (returnType instanceof ParameterizedType) { + Type[] types = ((ParameterizedType) returnType) + .getActualTypeArguments(); + if (types != null && types.length == 1) { + return (types[0].getTypeName() + .startsWith(Flux.class.getName())); + } + } + } + } + return FunctionUtils.isFluxSupplier(function); + } + private boolean isGenericSupplier(ConfigurableListableBeanFactory factory, String name) { return factory.isTypeMatch(name, diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/ConsumerProxy.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/ConsumerProxy.java new file mode 100644 index 000000000..856689419 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/ConsumerProxy.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.util.function.Consumer; + +/** + * @author Mark Fisher + * + * @param output type of target Consumer + */ +public interface ConsumerProxy extends Consumer { + + default boolean isFluxConsumer() { + return FunctionUtils.isFluxConsumer(getTarget()); + } + + Consumer getTarget(); +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxConsumer.java new file mode 100644 index 000000000..a9a928999 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxConsumer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.util.function.Consumer; + +import reactor.core.publisher.Flux; + +/** + * {@link Consumer} implementation that wraps a target Consumer so that the target's + * simple input type will be wrapped as a {@link Flux} instance. + * + * @author Dave Syer + * + * @param input type of target consumer + */ +public class FluxConsumer implements Consumer> { + + private final Consumer function; + + public FluxConsumer(Consumer function) { + this.function = function; + } + + @Override + public void accept(Flux input) { + input.subscribe(t -> function.accept(t)); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java index a391c6e28..96c6f1576 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java @@ -39,6 +39,6 @@ public class FluxFunction implements Function, Flux> { @Override public Flux apply(Flux input) { - return input.map(i->this.function.apply(i)); + return input.map(i -> this.function.apply(i)); } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java index 82dda14bf..5c3361f12 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java @@ -22,6 +22,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -37,7 +38,20 @@ public abstract class FunctionUtils { private static final String FLUX_CLASS_NAME = Flux.class.getName(); - private FunctionUtils() {} + private FunctionUtils() { + } + + @SuppressWarnings("rawtypes") + public static boolean isFluxConsumer(Consumer consumer) { + if (consumer instanceof ConsumerProxy) { + return ((ConsumerProxy) consumer).isFluxConsumer(); + } + String[] types = getParameterizedTypeNames(consumer, Consumer.class); + if (ObjectUtils.isEmpty(types)) { + return false; + } + return (types[0].startsWith(FLUX_CLASS_NAME)); + } @SuppressWarnings("rawtypes") public static boolean isFluxSupplier(Supplier supplier) { @@ -60,14 +74,17 @@ public abstract class FunctionUtils { if (ObjectUtils.isEmpty(types) || types.length != 2) { return false; } - return (types[0].startsWith(FLUX_CLASS_NAME) && types[1].startsWith(FLUX_CLASS_NAME)); + return (types[0].startsWith(FLUX_CLASS_NAME) + && types[1].startsWith(FLUX_CLASS_NAME)); } - private static String[] getParameterizedTypeNames(Object source, Class interfaceClass) { + private static String[] getParameterizedTypeNames(Object source, + Class interfaceClass) { Type[] genericInterfaces = source.getClass().getGenericInterfaces(); for (Type genericInterface : genericInterfaces) { - if ((genericInterface instanceof ParameterizedType) - && interfaceClass.getTypeName().equals(((ParameterizedType) genericInterface).getRawType().getTypeName())) { + if ((genericInterface instanceof ParameterizedType) && interfaceClass + .getTypeName().equals(((ParameterizedType) genericInterface) + .getRawType().getTypeName())) { ParameterizedType type = (ParameterizedType) genericInterface; Type[] args = type.getActualTypeArguments(); if (args != null) { @@ -88,8 +105,10 @@ public abstract class FunctionUtils { return null; } ReflectionUtils.makeAccessible(method); - SerializedLambda serializedLambda = (SerializedLambda) ReflectionUtils.invokeMethod(method, source); - String signature = serializedLambda.getImplMethodSignature().replaceAll("[()]", ""); + SerializedLambda serializedLambda = (SerializedLambda) ReflectionUtils + .invokeMethod(method, source); + String signature = serializedLambda.getImplMethodSignature().replaceAll("[()]", + ""); List typeNames = new ArrayList<>(); for (String types : signature.split(";")) { typeNames.add(types.substring(1).replace('/', '.')); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index a7a16a5dc..70b1dcd0b 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -115,6 +115,14 @@ public class RestApplicationTests { assertThat(result.getBody()).isEqualTo("foobar"); } + @Test + public void bareWords() throws Exception { + ResponseEntity result = rest + .exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getBody()).isEqualTo("foobar"); + } + @Test public void updates() throws Exception { ResponseEntity result = rest.exchange( @@ -124,6 +132,16 @@ public class RestApplicationTests { assertThat(result.getBody()).isEqualTo("onetwo"); } + @Test + public void bareUpdates() throws Exception { + ResponseEntity result = rest.exchange( + RequestEntity.post(new URI("/bareUpdates")).body("one\ntwo"), + String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo("onetwo"); + } + @Test public void timeoutJson() throws Exception { assertThat(rest @@ -184,6 +202,12 @@ public class RestApplicationTests { .isEqualTo("[FOO][BAR]"); } + @Test + public void bareUppercase() { + assertThat(rest.postForObject("/bareUppercase", "foo\nbar", String.class)) + .isEqualTo("[FOO][BAR]"); + } + @Test public void transform() { assertThat(rest.postForObject("/transform", "foo\nbar", String.class)) @@ -270,6 +294,11 @@ public class RestApplicationTests { .map(value -> "[" + value.trim().toUpperCase() + "]"); } + @Bean + public Function bareUppercase() { + return value -> "[" + value.trim().toUpperCase() + "]"; + } + @Bean public Function, Flux> wrap() { return flux -> flux.log().map(value -> ".." + value + ".."); @@ -294,11 +323,21 @@ public class RestApplicationTests { return () -> Flux.fromArray(new String[] { "foo", "bar" }); } + @Bean + public Supplier> bareWords() { + return () -> Arrays.asList("foo", "bar"); + } + @Bean public Consumer> updates() { return flux -> flux.subscribe(value -> list.add(value)); } + @Bean + public Consumer bareUpdates() { + return value -> list.add(value); + } + @Bean public Supplier> bang() { return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {