Add FunctionType abstraction and test it

This commit is contained in:
Dave Syer
2018-02-26 09:56:02 +00:00
parent 42a7babb7d
commit 38f6caf4dc
4 changed files with 350 additions and 14 deletions

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.context;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -38,6 +39,8 @@ public class FunctionRegistration<T> {
private final Map<String, String> properties = new LinkedHashMap<>();
private Type type;
public FunctionRegistration(T target) {
this.target = target;
}
@@ -50,11 +53,15 @@ public class FunctionRegistration<T> {
return names;
}
public Type getType() {
return type;
}
/**
* Will set the names for this registration clearing all
* previous names first. If you want to add a name or set or
* names to the existing set of names use {@link #names(Collection)}
* or {@link #name(String)} or {@link #names(String...)} operations.
* Will set the names for this registration clearing all previous names first. If you
* want to add a name or set or names to the existing set of names use
* {@link #names(Collection)} or {@link #name(String)} or {@link #names(String...)}
* operations.
* @param names
*/
public void setNames(Set<String> names) {
@@ -71,11 +78,15 @@ public class FunctionRegistration<T> {
return this;
}
public FunctionRegistration<T> type(Type type) {
this.type = type;
return this;
}
/**
* Allows to override the target of this registration with a new target
* that typically wraps the original target.
* This typically happens when original target is wrapped into its {@link Flux}
* counterpart (e.g., Function into FluxFunction)
* Allows to override the target of this registration with a new target that typically
* wraps the original target. This typically happens when original target is wrapped
* into its {@link Flux} counterpart (e.g., Function into FluxFunction)
* @param target new target
* @return this registration with new target
*/

View File

@@ -0,0 +1,165 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FunctionType {
private Type type;
public FunctionType(Type type) {
this.type = type;
}
public Class<?> getInputWrapper() {
return findType(ParamType.INPUT_WRAPPER);
}
public Class<?> getOutputWrapper() {
return findType(ParamType.OUTPUT_WRAPPER);
}
public Class<?> getInputType() {
return findType(ParamType.INPUT);
}
public Class<?> getOutputType() {
return findType(ParamType.OUTPUT);
}
public boolean isMessage() {
Class<?> inputType = findType(ParamType.INPUT_INNER_WRAPPER);
Class<?> outputType = findType(ParamType.OUTPUT_INNER_WRAPPER);
return inputType.getName().startsWith(Message.class.getName())
|| Message.class.isAssignableFrom(inputType)
|| outputType.getName().startsWith(Message.class.getName())
|| Message.class.isAssignableFrom(outputType);
}
private Class<?> findType(ParamType paramType) {
int index = paramType.isOutput() ? 1 : 0;
Type type = this.type;
if (type instanceof Class) {
for (Type iface : ((Class<?>) type).getGenericInterfaces()) {
if (iface.getTypeName().startsWith("java.util.function")) {
type = iface;
break;
}
}
}
Type param = extractType(type, paramType, index);
if (param != null) {
Class<?> result = extractClass(param, paramType);
if (result != null) {
return result;
}
}
return Object.class;
}
private Class<?> extractClass(Type param, ParamType paramType) {
if (param instanceof ParameterizedType) {
ParameterizedType concrete = (ParameterizedType) param;
param = concrete.getRawType();
}
if (param == null) {
// Last ditch attempt to guess: Flux<String>
if (paramType.isWrapper()) {
param = Flux.class;
}
else {
param = String.class;
}
}
Class<?> result = param instanceof Class ? (Class<?>) param : null;
// TODO: cache result
return result;
}
private Type extractType(Type type, ParamType paramType, int index) {
Type param;
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
if (parameterizedType.getActualTypeArguments().length == 1) {
// There's only one
index = 0;
}
Type typeArgumentAtIndex = parameterizedType.getActualTypeArguments()[index];
if (typeArgumentAtIndex instanceof ParameterizedType
&& !paramType.isWrapper()) {
if (FunctionInspector.isWrapper(
((ParameterizedType) typeArgumentAtIndex).getRawType())) {
param = ((ParameterizedType) typeArgumentAtIndex)
.getActualTypeArguments()[0];
param = extractNestedType(paramType, param);
}
else {
param = extractNestedType(paramType, typeArgumentAtIndex);
}
}
else {
param = extractNestedType(paramType, typeArgumentAtIndex);
}
}
else {
param = Object.class;
}
return param;
}
private Type extractNestedType(ParamType paramType, Type param) {
if (!paramType.isInnerWrapper() && param instanceof ParameterizedType) {
if (((ParameterizedType) param).getRawType().getTypeName()
.startsWith(Message.class.getName())) {
param = ((ParameterizedType) param).getActualTypeArguments()[0];
}
}
return param;
}
enum ParamType {
INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER, INPUT_INNER_WRAPPER, OUTPUT_INNER_WRAPPER;
public boolean isOutput() {
return this == OUTPUT || this == OUTPUT_WRAPPER
|| this == OUTPUT_INNER_WRAPPER;
}
public boolean isInput() {
return this == INPUT || this == INPUT_WRAPPER || this == INPUT_INNER_WRAPPER;
}
public boolean isWrapper() {
return this == OUTPUT_WRAPPER || this == INPUT_WRAPPER;
}
public boolean isInnerWrapper() {
return this == OUTPUT_INNER_WRAPPER || this == INPUT_INNER_WRAPPER;
}
}
}

View File

@@ -42,7 +42,6 @@ import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.ConstructorArgumentValues.ValueHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
@@ -50,7 +49,6 @@ import org.springframework.cloud.function.context.FunctionScan;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent;
import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent;
import org.springframework.cloud.function.context.catalog.InMemoryFunctionCatalog;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
@@ -86,7 +84,6 @@ import reactor.core.publisher.Flux;
*/
@FunctionScan
@Configuration
@ConditionalOnClass(InMemoryFunctionCatalog.class)
@ConditionalOnMissingBean(FunctionCatalog.class)
public class ContextFunctionCatalogAutoConfiguration {
@@ -461,7 +458,7 @@ public class ContextFunctionCatalogAutoConfiguration {
isMessage(target);
registration.target(target((Supplier<?>) target, key));
for (String name : registration.getNames()) {
this.suppliers.put(name, (Supplier<?>) registration.getTarget());
this.suppliers.put(name, registration.getTarget());
}
}
else if (target instanceof Consumer) {
@@ -471,7 +468,7 @@ public class ContextFunctionCatalogAutoConfiguration {
isMessage(target); // cache wrapper types
registration.target(target((Consumer<?>) target, key));
for (String name : registration.getNames()) {
this.consumers.put(name, (Consumer<?>) registration.getTarget());
this.consumers.put(name, registration.getTarget());
}
}
else if (target instanceof Function) {
@@ -483,7 +480,7 @@ public class ContextFunctionCatalogAutoConfiguration {
isMessage(target); // cache wrapper types
registration.target(target((Function<?, ?>) target, key));
for (String name : registration.getNames()) {
this.functions.put(name, (Function<?, ?>) registration.getTarget());
this.functions.put(name, registration.getTarget());
}
}
else {

View File

@@ -0,0 +1,163 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.junit.Test;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FunctionTypeTests {
@Test
public void plainFunction() {
FunctionType function = new FunctionType(IntegerToString.class);
assertThat(function.getInputType()).isEqualTo(Integer.class);
assertThat(function.getOutputType()).isEqualTo(String.class);
assertThat(function.getInputWrapper()).isEqualTo(Integer.class);
assertThat(function.getOutputWrapper()).isEqualTo(String.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void genericFunction() {
FunctionType function = new FunctionType(StringToMap.class);
assertThat(function.getInputType()).isEqualTo(String.class);
assertThat(function.getOutputType()).isEqualTo(Map.class);
assertThat(function.getInputWrapper()).isEqualTo(String.class);
assertThat(function.getOutputWrapper()).isEqualTo(Map.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void pojoFunction() {
FunctionType function = new FunctionType(FooToFoo.class);
assertThat(function.getInputType()).isEqualTo(Foo.class);
assertThat(function.getOutputType()).isEqualTo(Bar.class);
assertThat(function.getInputWrapper()).isEqualTo(Foo.class);
assertThat(function.getOutputWrapper()).isEqualTo(Bar.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void fluxFunction() {
FunctionType function = new FunctionType(FluxToFlux.class);
assertThat(function.getInputType()).isEqualTo(Foo.class);
assertThat(function.getOutputType()).isEqualTo(Bar.class);
assertThat(function.getInputWrapper()).isEqualTo(Flux.class);
assertThat(function.getOutputWrapper()).isEqualTo(Flux.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void fluxMessageFunction() {
FunctionType function = new FunctionType(FluxMessageToFluxMessage.class);
assertThat(function.getInputType()).isEqualTo(Foo.class);
assertThat(function.getOutputType()).isEqualTo(Bar.class);
assertThat(function.getInputWrapper()).isEqualTo(Flux.class);
assertThat(function.getOutputWrapper()).isEqualTo(Flux.class);
assertThat(function.isMessage()).isEqualTo(true);
}
@Test
public void plainFunctionFromType() {
Type type = ResolvableType
.forClassWithGenerics(Function.class, Integer.class, String.class)
.getType();
FunctionType function = new FunctionType(type);
assertThat(function.getInputType()).isEqualTo(Integer.class);
assertThat(function.getOutputType()).isEqualTo(String.class);
assertThat(function.getInputWrapper()).isEqualTo(Integer.class);
assertThat(function.getOutputWrapper()).isEqualTo(String.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void fluxMessageFunctionFromType() {
Type type = ResolvableType
.forClassWithGenerics(Function.class,
ResolvableType.forClassWithGenerics(
Flux.class,
ResolvableType.forClassWithGenerics(Message.class,
Foo.class)),
ResolvableType.forClassWithGenerics(Flux.class, ResolvableType
.forClassWithGenerics(Message.class, Bar.class)))
.getType();
FunctionType function = new FunctionType(type);
assertThat(function.getInputType()).isEqualTo(Foo.class);
assertThat(function.getOutputType()).isEqualTo(Bar.class);
assertThat(function.getInputWrapper()).isEqualTo(Flux.class);
assertThat(function.getOutputWrapper()).isEqualTo(Flux.class);
assertThat(function.isMessage()).isEqualTo(true);
}
private static class IntegerToString implements Function<Integer, String> {
@Override
public String apply(Integer t) {
return "" + t;
}
}
private static class StringToMap implements Function<String, Map<String, Integer>> {
@Override
public Map<String, Integer> apply(String t) {
return Collections.emptyMap();
}
}
private static class FooToFoo implements Function<Foo, Bar> {
@Override
public Bar apply(Foo t) {
return new Bar();
}
}
private static class FluxToFlux implements Function<Flux<Foo>, Flux<Bar>> {
@Override
public Flux<Bar> apply(Flux<Foo> t) {
return t.map(f -> new Bar());
}
}
private static class FluxMessageToFluxMessage
implements Function<Flux<Message<Foo>>, Flux<Message<Bar>>> {
@Override
public Flux<Message<Bar>> apply(Flux<Message<Foo>> t) {
return t.map(f -> MessageBuilder.withPayload(new Bar())
.copyHeadersIfAbsent(f.getHeaders()).build());
}
}
private static class Foo {
}
private static class Bar {
}
}