@@ -859,7 +859,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
}
|
||||
}
|
||||
else { //if (rawType instanceof Class<?>) { // see AWS adapter with WildardTypeImpl and Azure with Voids
|
||||
convertedValue = this.convertNonMessageInputIfNecessary(type, value);
|
||||
convertedValue = FunctionTypeUtils.isPublisher(type)
|
||||
? this.convertNonMessageInputIfNecessary(rawType, value)
|
||||
: this.convertNonMessageInputIfNecessary(type, value);
|
||||
}
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
||||
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -505,6 +506,19 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
assertThat(result).isEqualTo("BIKE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGH_608() {
|
||||
ApplicationContext context = new SpringApplicationBuilder(SampleFunctionConfiguration.class)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.main.lazy-initialization=true");
|
||||
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
|
||||
|
||||
Consumer<Flux<String>> consumer = catalog.lookup("reactivePojoConsumer");
|
||||
consumer.accept(Flux.just("{\"name\":\"Ricky\"}"));
|
||||
SampleFunctionConfiguration config = context.getBean(SampleFunctionConfiguration.class);
|
||||
assertThat(((Person) config.consumerInputRef.get()).getName()).isEqualTo("Ricky");
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
public static class PojoToMessageFunctionCompositionConfiguration {
|
||||
|
||||
@@ -660,6 +674,8 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
@Configuration
|
||||
protected static class SampleFunctionConfiguration {
|
||||
|
||||
AtomicReference<Object> consumerInputRef = new AtomicReference<>();
|
||||
|
||||
@Bean
|
||||
public Function<Person, Person> uppercasePerson() {
|
||||
return person -> {
|
||||
@@ -816,6 +832,12 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
public Consumer<Flux<String>> reactiveConsumer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Bean
|
||||
// Perhaps it should not be allowed. Recommend Function<Flux, Mono<Void>>
|
||||
public Consumer<Flux<Person>> reactivePojoConsumer() {
|
||||
return flux -> flux.subscribe(v -> consumerInputRef.set(v));
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
|
||||
Reference in New Issue
Block a user