Extend support for simple types to Consumer and Supplier

The function catalog now always wraps beans that deal with non-flux
generic types.
This commit is contained in:
Dave Syer
2017-03-29 13:09:36 +01:00
parent 07c9d1c460
commit 2b88eaeb08
6 changed files with 196 additions and 8 deletions

View File

@@ -41,7 +41,9 @@ import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.function.support.FluxConsumer;
import org.springframework.cloud.function.support.FluxFunction;
import org.springframework.cloud.function.support.FluxSupplier;
import org.springframework.cloud.function.support.FunctionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -113,6 +115,11 @@ public class ContextFunctionCatalogAutoConfiguration {
Supplier<Flux<?>> supplier = (Supplier<Flux<?>>) target;
return wrapSupplier(supplier, mapper, key);
}
else if (!isFluxSupplier(key, target)) {
@SuppressWarnings({ "unchecked", "rawtypes" })
FluxSupplier value = new FluxSupplier(target);
return value;
}
else {
return target;
}
@@ -167,6 +174,11 @@ public class ContextFunctionCatalogAutoConfiguration {
Consumer<Flux<?>> consumer = (Consumer<Flux<?>>) target;
return wrapConsumer(consumer, mapper, key);
}
else if (!isFluxConsumer(key, target)) {
@SuppressWarnings({ "unchecked", "rawtypes" })
FluxConsumer value = new FluxConsumer(target);
return value;
}
else {
return target;
}
@@ -211,6 +223,48 @@ public class ContextFunctionCatalogAutoConfiguration {
return FunctionUtils.isFluxFunction(function);
}
private boolean isFluxConsumer(String name, Consumer<?> function) {
if (this.registry.containsBeanDefinition(name)) {
BeanDefinition beanDefinition = this.registry.getBeanDefinition(name);
Object source = beanDefinition.getSource();
if (source instanceof StandardMethodMetadata) {
StandardMethodMetadata metadata = (StandardMethodMetadata) source;
Type returnType = metadata.getIntrospectedMethod()
.getGenericReturnType();
if (returnType instanceof ParameterizedType) {
Type[] types = ((ParameterizedType) returnType)
.getActualTypeArguments();
if (types != null && types.length == 1) {
return (types[0].getTypeName()
.startsWith(Flux.class.getName()));
}
}
}
}
return FunctionUtils.isFluxConsumer(function);
}
private boolean isFluxSupplier(String name, Supplier<?> function) {
if (this.registry.containsBeanDefinition(name)) {
BeanDefinition beanDefinition = this.registry.getBeanDefinition(name);
Object source = beanDefinition.getSource();
if (source instanceof StandardMethodMetadata) {
StandardMethodMetadata metadata = (StandardMethodMetadata) source;
Type returnType = metadata.getIntrospectedMethod()
.getGenericReturnType();
if (returnType instanceof ParameterizedType) {
Type[] types = ((ParameterizedType) returnType)
.getActualTypeArguments();
if (types != null && types.length == 1) {
return (types[0].getTypeName()
.startsWith(Flux.class.getName()));
}
}
}
}
return FunctionUtils.isFluxSupplier(function);
}
private boolean isGenericSupplier(ConfigurableListableBeanFactory factory,
String name) {
return factory.isTypeMatch(name,

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.support;
import java.util.function.Consumer;
/**
* @author Mark Fisher
*
* @param <T> output type of target Consumer
*/
public interface ConsumerProxy<T> extends Consumer<T> {
default boolean isFluxConsumer() {
return FunctionUtils.isFluxConsumer(getTarget());
}
Consumer<T> getTarget();
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.support;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
/**
* {@link Consumer} implementation that wraps a target Consumer so that the target's
* simple input type will be wrapped as a {@link Flux} instance.
*
* @author Dave Syer
*
* @param <T> input type of target consumer
*/
public class FluxConsumer<T> implements Consumer<Flux<T>> {
private final Consumer<T> function;
public FluxConsumer(Consumer<T> function) {
this.function = function;
}
@Override
public void accept(Flux<T> input) {
input.subscribe(t -> function.accept(t));
}
}

View File

@@ -39,6 +39,6 @@ public class FluxFunction<T, R> implements Function<Flux<T>, Flux<R>> {
@Override
public Flux<R> apply(Flux<T> input) {
return input.map(i->this.function.apply(i));
return input.map(i -> this.function.apply(i));
}
}

View File

@@ -22,6 +22,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -37,7 +38,20 @@ public abstract class FunctionUtils {
private static final String FLUX_CLASS_NAME = Flux.class.getName();
private FunctionUtils() {}
private FunctionUtils() {
}
@SuppressWarnings("rawtypes")
public static boolean isFluxConsumer(Consumer<?> consumer) {
if (consumer instanceof ConsumerProxy) {
return ((ConsumerProxy) consumer).isFluxConsumer();
}
String[] types = getParameterizedTypeNames(consumer, Consumer.class);
if (ObjectUtils.isEmpty(types)) {
return false;
}
return (types[0].startsWith(FLUX_CLASS_NAME));
}
@SuppressWarnings("rawtypes")
public static boolean isFluxSupplier(Supplier<?> supplier) {
@@ -60,14 +74,17 @@ public abstract class FunctionUtils {
if (ObjectUtils.isEmpty(types) || types.length != 2) {
return false;
}
return (types[0].startsWith(FLUX_CLASS_NAME) && types[1].startsWith(FLUX_CLASS_NAME));
return (types[0].startsWith(FLUX_CLASS_NAME)
&& types[1].startsWith(FLUX_CLASS_NAME));
}
private static String[] getParameterizedTypeNames(Object source, Class<?> interfaceClass) {
private static String[] getParameterizedTypeNames(Object source,
Class<?> interfaceClass) {
Type[] genericInterfaces = source.getClass().getGenericInterfaces();
for (Type genericInterface : genericInterfaces) {
if ((genericInterface instanceof ParameterizedType)
&& interfaceClass.getTypeName().equals(((ParameterizedType) genericInterface).getRawType().getTypeName())) {
if ((genericInterface instanceof ParameterizedType) && interfaceClass
.getTypeName().equals(((ParameterizedType) genericInterface)
.getRawType().getTypeName())) {
ParameterizedType type = (ParameterizedType) genericInterface;
Type[] args = type.getActualTypeArguments();
if (args != null) {
@@ -88,8 +105,10 @@ public abstract class FunctionUtils {
return null;
}
ReflectionUtils.makeAccessible(method);
SerializedLambda serializedLambda = (SerializedLambda) ReflectionUtils.invokeMethod(method, source);
String signature = serializedLambda.getImplMethodSignature().replaceAll("[()]", "");
SerializedLambda serializedLambda = (SerializedLambda) ReflectionUtils
.invokeMethod(method, source);
String signature = serializedLambda.getImplMethodSignature().replaceAll("[()]",
"");
List<String> typeNames = new ArrayList<>();
for (String types : signature.split(";")) {
typeNames.add(types.substring(1).replace('/', '.'));

View File

@@ -115,6 +115,14 @@ public class RestApplicationTests {
assertThat(result.getBody()).isEqualTo("foobar");
}
@Test
public void bareWords() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("foobar");
}
@Test
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
@@ -124,6 +132,16 @@ public class RestApplicationTests {
assertThat(result.getBody()).isEqualTo("onetwo");
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/bareUpdates")).body("one\ntwo"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
@@ -184,6 +202,12 @@ public class RestApplicationTests {
.isEqualTo("[FOO][BAR]");
}
@Test
public void bareUppercase() {
assertThat(rest.postForObject("/bareUppercase", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
}
@Test
public void transform() {
assertThat(rest.postForObject("/transform", "foo\nbar", String.class))
@@ -270,6 +294,11 @@ public class RestApplicationTests {
.map(value -> "[" + value.trim().toUpperCase() + "]");
}
@Bean
public Function<String, String> bareUppercase() {
return value -> "[" + value.trim().toUpperCase() + "]";
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
@@ -294,11 +323,21 @@ public class RestApplicationTests {
return () -> Flux.fromArray(new String[] { "foo", "bar" });
}
@Bean
public Supplier<List<String>> bareWords() {
return () -> Arrays.asList("foo", "bar");
}
@Bean
public Consumer<Flux<String>> updates() {
return flux -> flux.subscribe(value -> list.add(value));
}
@Bean
public Consumer<String> bareUpdates() {
return value -> list.add(value);
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {