Ensure composite wrapper type reflects reality

When a Supplier<Flux<Foo>> is composed with a Function<Foo,Bar>
the resulting handler (supplier) should have Flux as its output
wrapper still (the most general output wrapper type in the chain).
This commit is contained in:
Dave Syer
2018-03-16 08:30:33 -04:00
parent 47f86671ca
commit 773dddbe68
5 changed files with 72 additions and 11 deletions

View File

@@ -98,6 +98,9 @@ public class FunctionType {
}
public static boolean isWrapper(Type type) {
if (type instanceof ParameterizedType) {
type = ((ParameterizedType)type).getRawType();
}
return Publisher.class.equals(type) || Flux.class.equals(type)
|| Mono.class.equals(type) || Optional.class.equals(type);
}
@@ -147,21 +150,34 @@ public class FunctionType {
public static FunctionType compose(FunctionType input, FunctionType output) {
ResolvableType inputGeneric = input(input);
ResolvableType outputGeneric = output(output);
if (!isWrapper(outputGeneric.getType())) {
ResolvableType inputOutput = output(input);
if (isWrapper(inputOutput.getType())) {
outputGeneric = wrap(input,
extractClass(inputOutput.getType(), ParamType.OUTPUT_WRAPPER),
extractClass(outputGeneric.getType(), ParamType.OUTPUT));
}
}
return new FunctionType(ResolvableType
.forClassWithGenerics(Function.class, inputGeneric, outputGeneric)
.getType());
}
private ResolvableType wrap(Class<?> wrapper, Class<?> type) {
return isMessage() ? wrap(wrapper, message(type))
return wrap(this, wrapper, type);
}
private static ResolvableType wrap(FunctionType input, Class<?> wrapper,
Class<?> type) {
return input.isMessage() ? wrap(wrapper, message(type))
: ResolvableType.forClassWithGenerics(wrapper, type);
}
private ResolvableType wrap(Class<?> wrapper, ResolvableType type) {
private static ResolvableType wrap(Class<?> wrapper, ResolvableType type) {
return ResolvableType.forClassWithGenerics(wrapper, type);
}
private ResolvableType message(Class<?> type) {
private static ResolvableType message(Class<?> type) {
return ResolvableType.forClassWithGenerics(Message.class, type);
}
@@ -224,7 +240,7 @@ public class FunctionType {
return Object.class;
}
private Class<?> extractClass(Type param, ParamType paramType) {
private static Class<?> extractClass(Type param, ParamType paramType) {
if (param instanceof ParameterizedType) {
ParameterizedType concrete = (ParameterizedType) param;
param = concrete.getRawType();

View File

@@ -165,6 +165,18 @@ public class FunctionTypeTests {
assertThat(function.isMessage()).isEqualTo(true);
}
@Test
public void compose() {
FunctionType input = FunctionType.from(Foo.class).to(Bar.class).wrap(Flux.class);
FunctionType output = FunctionType.from(Bar.class).to(String.class);
FunctionType function = FunctionType.compose(input, output);
assertThat(function.getInputType()).isEqualTo(Foo.class);
assertThat(function.getOutputType()).isEqualTo(String.class);
assertThat(function.getInputWrapper()).isEqualTo(Flux.class);
assertThat(function.getOutputWrapper()).isEqualTo(Flux.class);
assertThat(function.isMessage()).isEqualTo(false);
}
@Test
public void idempotentMessage() {
FunctionType function = FunctionType.from(Foo.class).to(Bar.class).message()

View File

@@ -98,7 +98,8 @@ public class ContextFunctionPostProcessorTests {
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) processor
.lookupFunction("foos,bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(processor.getRegistration(foos).getNames()).containsExactly("foos|bars");
assertThat(processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
}
@Test
@@ -109,7 +110,21 @@ public class ContextFunctionPostProcessorTests {
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) processor
.lookupFunction("foos|bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(processor.getRegistration(foos).getNames()).containsExactly("foos|bars");
assertThat(processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
}
@Test
public void composeWrapper() {
processor.register(new FunctionRegistration<>(new WrappedSource()).names("ints"));
processor.register(new FunctionRegistration<>(new Foos()).names("foos"));
@SuppressWarnings("unchecked")
Supplier<Flux<String>> foos = (Supplier<Flux<String>>) processor
.lookupSupplier("ints|foos");
assertThat(foos.get().blockFirst()).isEqualTo("8");
assertThat(processor.getRegistration(foos).getNames())
.containsExactly("ints|foos");
assertThat(processor.getRegistration(foos).getType().getOutputWrapper()).isEqualTo(Flux.class);
}
@Test
@@ -127,7 +142,8 @@ public class ContextFunctionPostProcessorTests {
public void isolatedSupplier() {
contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(getClass().getClassLoader());
processor.register(new FunctionRegistration<>(create(Source.class)).names("source"));
processor.register(
new FunctionRegistration<>(create(Source.class)).names("source"));
@SuppressWarnings("unchecked")
Supplier<Flux<Integer>> source = (Supplier<Flux<Integer>>) processor
.lookupSupplier("source");
@@ -145,7 +161,8 @@ public class ContextFunctionPostProcessorTests {
.lookupConsumer("sink");
sink.accept(Flux.just("Hello"));
@SuppressWarnings("unchecked")
List<String> values = (List<String>) ReflectionTestUtils.getField(target, "values");
List<String> values = (List<String>) ReflectionTestUtils.getField(target,
"values");
assertThat(values).contains("Hello");
}
@@ -201,6 +218,15 @@ public class ContextFunctionPostProcessorTests {
}
public static class WrappedSource implements Supplier<Flux<Integer>> {
@Override
public Flux<Integer> get() {
return Flux.just(4);
}
}
public static class Foo {
private String value;
@@ -239,5 +265,5 @@ public class ContextFunctionPostProcessorTests {
}
}
}