Support for Function<Publisher<...>,...>
This commit is contained in:
@@ -19,6 +19,8 @@ package org.springframework.cloud.function.context.catalog;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -44,7 +46,7 @@ public interface FunctionInspector {
|
||||
|
||||
// Maybe make this a default method?
|
||||
static boolean isWrapper(Type type) {
|
||||
return Flux.class.equals(type) || Mono.class.equals(type)
|
||||
return Publisher.class.equals(type) || Flux.class.equals(type) || Mono.class.equals(type)
|
||||
|| Optional.class.equals(type);
|
||||
}
|
||||
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
@@ -30,6 +28,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
@@ -63,6 +62,8 @@ import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
@@ -175,6 +176,18 @@ public class ContextFunctionCatalogAutoConfigurationTests {
|
||||
.isAssignableFrom(Flux.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publisherMessageFunction() {
|
||||
create(PublisherMessageConfiguration.class);
|
||||
assertThat(context.getBean("function")).isInstanceOf(Function.class);
|
||||
assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class);
|
||||
assertThat(inspector.isMessage(catalog.lookupFunction("function"))).isTrue();
|
||||
assertThat(inspector.getInputType(catalog.lookupFunction("function")))
|
||||
.isAssignableFrom(String.class);
|
||||
assertThat(inspector.getInputWrapper(catalog.lookupFunction("function")))
|
||||
.isAssignableFrom(Publisher.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messageFunction() {
|
||||
create(MessageConfiguration.class);
|
||||
@@ -572,6 +585,16 @@ public class ContextFunctionCatalogAutoConfigurationTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class PublisherMessageConfiguration {
|
||||
@Bean
|
||||
public Function<Publisher<Message<String>>, Publisher<Message<String>>> function() {
|
||||
return flux -> Flux.from(flux).map(m -> MessageBuilder
|
||||
.withPayload(m.getPayload().toUpperCase()).build());
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class MessageConfiguration {
|
||||
|
||||
Reference in New Issue
Block a user