Refactor FunctionCatalog implementation

This commit is contained in:
Oleg Zhurakousky
2020-09-17 14:02:51 +02:00
parent 978a474c81
commit 72f05fc591
34 changed files with 1597 additions and 1643 deletions

View File

@@ -32,6 +32,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
@@ -41,6 +42,7 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
@@ -184,10 +186,12 @@ public class RequestProcessor {
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
Publisher<?> result, Boolean single, boolean getter) {
BodyBuilder builder = ResponseEntity.ok();
if (this.inspector.isMessage(handler)) {
if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> addHeaders(builder, value))
.doOnNext(value -> {
addHeaders(builder, value);
})
.map(message -> message.getPayload());
}
else {
@@ -256,6 +260,7 @@ public class RequestProcessor {
}
else if (function instanceof FunctionInvocationWrapper) {
Publisher<?> result = (Publisher<?>) function.apply(flux);
// Publisher<?> result = null;
if (((FunctionInvocationWrapper) function).isConsumer()) {
if (result != null) {
((Mono) result).subscribe();
@@ -455,11 +460,33 @@ public class RequestProcessor {
}
private Type getItemType(Object function) {
Class<?> inputType = this.inspector.getInputType(function);
if (function == null || ((FunctionInvocationWrapper) function).getInputType() == Object.class) {
return Object.class;
}
Type itemType;
if (((FunctionInvocationWrapper) function).isInputTypePublisher() && ((FunctionInvocationWrapper) function).isInputTypeMessage()) {
itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
itemType = FunctionTypeUtils.getImmediateGenericType(itemType, 0);
}
else {
itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
}
if (itemType != null) {
return itemType;
}
Class<?> inputType = ((FunctionInvocationWrapper) function).isInputTypeMessage() || ((FunctionInvocationWrapper) function).isInputTypePublisher()
? TypeResolver.resolveRawClass(itemType, null)
: ((FunctionInvocationWrapper) function).getRawInputType();
if (!Collection.class.isAssignableFrom(inputType)) {
return inputType;
}
Type type = this.inspector.getRegistration(function).getType().getType();
// Type type = this.inspector.getRegistration(function).getType().getType();
Type type = ((FunctionInvocationWrapper) function).getInputType();
if (type instanceof ParameterizedType) {
type = ((ParameterizedType) type).getActualTypeArguments()[0];
}

View File

@@ -63,7 +63,8 @@ public class FunctionController {
public Mono<ResponseEntity<?>> post(WebRequest request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return this.processor.post(wrapper, body, false);
Mono<ResponseEntity<?>> result = this.processor.post(wrapper, body, false);
return result;
}
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.web.source;
import java.lang.reflect.Type;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
@@ -34,6 +35,7 @@ import org.springframework.cloud.function.web.source.FunctionExporterAutoConfigu
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.Environment;
import org.springframework.web.reactive.function.client.WebClient;
@@ -66,9 +68,11 @@ public class FunctionExporterAutoConfiguration {
public FunctionRegistration<Supplier<Flux<?>>> origin(WebClient.Builder builder) {
HttpSupplier supplier = new HttpSupplier(builder.build(), this.props);
FunctionRegistration<Supplier<Flux<?>>> registration = new FunctionRegistration<>(supplier);
FunctionType type = FunctionType.supplier(this.props.getSource().getType()).wrap(Flux.class);
Type rawType = ResolvableType.forClassWithGenerics(Supplier.class, this.props.getSource().getType()).getType();
// FunctionType functionType = FunctionType.supplier(this.props.getSource().getType()).wrap(Flux.class);
FunctionType type = FunctionType.of(rawType);
if (this.props.getSource().isIncludeHeaders()) {
type = type.message();
// type = type.message();
}
registration = registration.type(type);
return registration;

View File

@@ -168,7 +168,11 @@ public class SupplierExporter implements SmartLifecycle {
}
private Flux<ClientResponse> forward(Supplier<Publisher<Object>> supplier, String name) {
return Flux.from(supplier.get()).flatMap(value -> {
Flux o = (Flux) supplier.get();
// o.subscribe(v -> {
// System.out.println(v);
// });
return Flux.from(o).flatMap(value -> {
String destination = this.destinationResolver.destination(supplier, name, value);
if (this.debug) {
logger.info("Posting to: " + destination);

View File

@@ -29,6 +29,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.web.reactive.server.WebTestClient;
/**
* @author Oleg Zhurakousky
*
@@ -44,15 +45,16 @@ public class HeadersToMessageTests {
@Test
public void testBodyAndCustomHeaderFromMessagePropagation() throws Exception {
this.client.post().uri("/").body(Mono.just("foo"), String.class).exchange()
.expectStatus().isOk().expectHeader()
.valueEquals("x-content-type", "application/xml").expectHeader()
.valueEquals("foo", "bar").expectBody(String.class).isEqualTo("FOO");
.expectStatus().isOk().expectHeader()
.valueEquals("x-content-type", "application/xml").expectHeader()
.valueEquals("foo", "bar").expectBody(String.class).isEqualTo("FOO");
}
@SpringBootConfiguration
protected static class TestConfiguration
implements Function<Message<String>, Message<String>> {
@Override
public Message<String> apply(Message<String> request) {
Message<String> message = MessageBuilder
.withPayload(request.getPayload().toUpperCase())

View File

@@ -26,6 +26,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -38,7 +39,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.function.web.RestApplication;
@@ -50,6 +50,7 @@ import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
@@ -81,7 +82,13 @@ public class HttpPostIntegrationTests {
this.test.list.clear();
}
@AfterEach
public void done() {
this.test.list.clear();
}
@Test
@DirtiesContext
public void qualifierFoos() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/foos")).contentType(MediaType.APPLICATION_JSON)
@@ -92,6 +99,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void updates() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
@@ -102,6 +110,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void updatesJson() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/updates")).contentType(MediaType.APPLICATION_JSON)
@@ -112,6 +121,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void addFoos() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/addFoos")).contentType(MediaType.APPLICATION_JSON)
@@ -122,6 +132,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void addFoosFlux() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/addFoosFlux")).contentType(MediaType.APPLICATION_JSON)
@@ -132,6 +143,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void bareUpdates() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON)
@@ -141,6 +153,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercase() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
@@ -149,6 +162,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void messages() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/messages")).contentType(MediaType.APPLICATION_JSON)
@@ -159,6 +173,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void headers() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON)
@@ -169,6 +184,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercaseSingleValue() throws Exception {
ResponseEntity<String> result = this.rest
.exchange(
@@ -189,6 +205,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/upFoos")).contentType(MediaType.APPLICATION_JSON)
@@ -198,6 +215,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercaseFoo() throws Exception {
// Single Foo can be parsed
ResponseEntity<String> result = this.rest.exchange(RequestEntity
@@ -207,6 +225,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void bareUppercaseFoos() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/bareUpFoos")).contentType(MediaType.APPLICATION_JSON)
@@ -216,6 +235,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void typelessFunctionPassingArray() throws Exception {
ResponseEntity<String> result = this.rest.exchange(
RequestEntity.post(new URI("/typelessFunctionExpectingText"))
@@ -225,6 +245,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void bareUppercaseFoo() throws Exception {
// Single Foo can be parsed and returns a single value if the function is defined
// that way
@@ -235,6 +256,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void bareUppercase() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/bareUppercase")).contentType(MediaType.APPLICATION_JSON)
@@ -243,6 +265,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void singleValuedText() throws Exception {
ResponseEntity<String> result = this.rest.exchange(
RequestEntity.post(new URI("/bareUppercase")).accept(MediaType.TEXT_PLAIN)
@@ -252,6 +275,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void transform() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/transform")).contentType(MediaType.APPLICATION_JSON)
@@ -260,6 +284,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void postMore() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/post/more")).contentType(MediaType.APPLICATION_JSON)
@@ -268,6 +293,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void convertPost() throws Exception {
ResponseEntity<String> result = this.rest
.exchange(
@@ -279,6 +305,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void convertPostJson() throws Exception {
// If you POST a single value to a Function<Flux<Integer>,Flux<Integer>> it can't
// determine if the output is single valued, so it has to send an array back
@@ -291,6 +318,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercaseJsonArray() throws Exception {
assertThat(this.rest.exchange(
RequestEntity.post(new URI("/maps"))
@@ -302,6 +330,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void uppercaseSSE() throws Exception {
assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
@@ -309,6 +338,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void sum() throws Exception {
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
@@ -323,6 +353,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void multipart() throws Exception {
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
@@ -337,6 +368,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void count() throws Exception {
List<String> list = Arrays.asList("A", "B", "A");
assertThat(this.rest.exchange(
@@ -346,6 +378,7 @@ public class HttpPostIntegrationTests {
}
@Test
@DirtiesContext
public void fluxWithList() throws Exception {
List<String> list = Arrays.asList("A", "B", "A");
assertThat(this.rest.exchange(
@@ -359,7 +392,6 @@ public class HttpPostIntegrationTests {
}
@EnableAutoConfiguration
@TestConfiguration
public static class ApplicationConfiguration {
private List<String> list = new ArrayList<>();