Enable JSON conversion for non-Flux functions

- use ProxyWrapper around a FluxConsumer as well
  making it consistent with the behaviour of Flux
  functions
- Enable introspection for scanned beans
- Fix failing tests by passing JSON string as input messages
  (marshalled form expected from the binder)
This commit is contained in:
Marius Bogoevici
2017-04-05 20:22:06 -04:00
committed by Dave Syer
parent d1cf9b47a4
commit 70dff6bb6b
4 changed files with 40 additions and 18 deletions

View File

@@ -32,6 +32,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,9 +40,9 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.function.registry.FunctionCatalog;
@@ -53,13 +54,12 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
* @author Mark Fisher
@@ -209,7 +209,7 @@ public class ContextFunctionCatalogAutoConfiguration {
else if (!isFluxSupplier(key, target)) {
@SuppressWarnings({ "unchecked", "rawtypes" })
FluxSupplier value = new FluxSupplier(target);
return value;
return wrapSupplier(value, mapper, key);
}
else {
return target;
@@ -226,7 +226,7 @@ public class ContextFunctionCatalogAutoConfiguration {
else if (!isFluxFunction(key, target)) {
@SuppressWarnings({ "unchecked", "rawtypes" })
FluxFunction value = new FluxFunction(target);
return value;
return wrapFunction(value, mapper, key);
}
else {
return target;
@@ -242,7 +242,7 @@ public class ContextFunctionCatalogAutoConfiguration {
else if (!isFluxConsumer(key, target)) {
@SuppressWarnings({ "unchecked", "rawtypes" })
FluxConsumer value = new FluxConsumer(target);
return value;
return wrapConsumer(value, mapper, key);
}
else {
return target;
@@ -367,16 +367,38 @@ public class ContextFunctionCatalogAutoConfiguration {
return wrapped;
}
private Class<?> findType(RootBeanDefinition definition, int index) {
StandardMethodMetadata source = (StandardMethodMetadata) definition
.getSource();
private Class<?> findType(AbstractBeanDefinition definition, int index) {
Object source = definition.getSource();
Type param;
if (source instanceof StandardMethodMetadata) {
ParameterizedType type;
type = (ParameterizedType) (source.getIntrospectedMethod()
.getGenericReturnType());
type = (ParameterizedType) type.getActualTypeArguments()[index];
param = type.getActualTypeArguments()[0];
type = (ParameterizedType) ((StandardMethodMetadata) source).getIntrospectedMethod()
.getGenericReturnType();
Type typeArgumentAtIndex = type.getActualTypeArguments()[index];
if (typeArgumentAtIndex instanceof ParameterizedType) {
param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0];
}
else {
param = typeArgumentAtIndex;
}
}
else if (source instanceof FileSystemResource) {
try {
Type type = ClassUtils.forName(definition.getBeanClassName(), null);
if (type instanceof ParameterizedType) {
Type typeArgumentAtIndex = ((ParameterizedType)type).getActualTypeArguments()[index];
if (typeArgumentAtIndex instanceof ParameterizedType) {
param = ((ParameterizedType) typeArgumentAtIndex).getActualTypeArguments()[0];
} else {
param = typeArgumentAtIndex;
}
}
else {
param = type;
}
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Cannot instrospect bean: " + definition, e);
}
}
else {
ResolvableType resolvable = (ResolvableType) getField(definition,
@@ -398,11 +420,11 @@ public class ContextFunctionCatalogAutoConfiguration {
}
private Class<?> findType(String name) {
return findType((RootBeanDefinition) registry.getBeanDefinition(name), 0);
return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 0);
}
private Class<?> findOutputType(String name) {
return findType((RootBeanDefinition) registry.getBeanDefinition(name), 1);
return findType((AbstractBeanDefinition) registry.getBeanDefinition(name), 1);
}
}

View File

@@ -66,7 +66,7 @@ public class FluxPojoStreamingConsumerTests {
@Bean
public Consumer<Flux<String>> sinkConsumer(final List<String> sinkCollector) {
return foos -> foos.doOnNext(s -> sinkCollector.add(s));
return foos -> foos.subscribe(s -> sinkCollector.add(s));
}
}

View File

@@ -52,7 +52,7 @@ public class FluxStreamingConsumerTests {
@Test
public void test() throws Exception {
sink.input().send(MessageBuilder.withPayload(new Foo("foo")).build());
sink.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
assertThat(sinkCollector).hasSize(1);
}

View File

@@ -50,7 +50,7 @@ public class PojoStreamingConsumerTests {
@Test
public void test() throws Exception {
sink.input().send(MessageBuilder.withPayload(new Foo("foo")).build());
sink.input().send(MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
assertThat(sinkCollector).hasSize(1);
}