From aba50816f74795292c187407db10cc05283371df Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 9 Nov 2018 12:35:36 +0000 Subject: [PATCH] Add support for detecting FunctionRegistration or Function User can now provide a Function or an ApplicationInitializer. Also the initializer can create a FunctionRegistration with the handler name instead of a bean with the handler name. Better control of input and output types that way. Fixes gh-231 --- .../src/it/flux/pom.xml | 4 +- .../src/it/support/pom.xml | 2 +- .../function/deployer/ContextRunner.java | 29 +--- .../FunctionCreatorConfiguration.java | 29 +++- .../FunctionCreatorConfigurationTests.java | 37 ++++- .../function/test/FunctionInitializer.java | 53 +++++++ .../function/test/FunctionRegistrar.java | 28 ++-- .../cloud/function/web/RequestProcessor.java | 149 ++++++++++++++++-- .../function/web/flux/FunctionController.java | 7 + .../function/FunctionEndpointInitializer.java | 6 +- 10 files changed, 281 insertions(+), 63 deletions(-) create mode 100644 spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionInitializer.java diff --git a/spring-cloud-function-deployer/src/it/flux/pom.xml b/spring-cloud-function-deployer/src/it/flux/pom.xml index 706662d9c..2f4113539 100644 --- a/spring-cloud-function-deployer/src/it/flux/pom.xml +++ b/spring-cloud-function-deployer/src/it/flux/pom.xml @@ -5,7 +5,7 @@ com.example flux-sample - 1.0.0.M1 + 1.0.0.RC1 jar @@ -18,7 +18,7 @@ 1.8 2.0.0.BUILD-SNAPSHOT - 1.0.15.RELEASE + 1.0.17.RELEASE diff --git a/spring-cloud-function-deployer/src/it/support/pom.xml b/spring-cloud-function-deployer/src/it/support/pom.xml index ff1ceb1b9..5b4b7c25f 100644 --- a/spring-cloud-function-deployer/src/it/support/pom.xml +++ b/spring-cloud-function-deployer/src/it/support/pom.xml @@ -18,7 +18,7 @@ 1.8 2.0.0.BUILD-SNAPSHOT - 1.0.12.RELEASE + 1.0.17.RELEASE diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/ContextRunner.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/ContextRunner.java index d7cf96d08..86b9d82ab 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/ContextRunner.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/ContextRunner.java @@ -18,13 +18,10 @@ package org.springframework.cloud.function.deployer; import java.lang.reflect.Field; import java.net.URL; -import java.util.Collections; import java.util.Map; -import org.springframework.beans.BeanUtils; import org.springframework.boot.SpringApplication; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextInitializer; +import org.springframework.cloud.function.context.FunctionalSpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.MapPropertySource; import org.springframework.core.env.StandardEnvironment; @@ -59,16 +56,7 @@ public class ContextRunner { new MapPropertySource("appDeployer", properties)); running = true; Class sourceClass = ClassUtils.resolveClassName(source, null); - ApplicationContextInitializer initializer = null; - if (ApplicationContextInitializer.class.isAssignableFrom(sourceClass)) { - initializer = BeanUtils.instantiateClass(sourceClass, ApplicationContextInitializer.class); - sourceClass = Dummy.class; - } SpringApplication builder = builder(sourceClass); - if (initializer!=null) { - builder.addInitializers(initializer); - builder.setDefaultProperties(Collections.singletonMap("spring.functional.enabled", "true")); - } builder.setEnvironment(environment); builder.setRegisterShutdownHook(false); context = builder.run(args); @@ -131,19 +119,8 @@ public class ContextRunner { } private static SpringApplication builder(Class type) { - if (type==Dummy.class) { - SpringApplication application = new SpringApplication() { - @Override - protected void load(ApplicationContext context, Object[] sources) { - } - }; - // Boot doesn't allow null sources - application.setSources(Collections.singleton(Dummy.class.getName())); - return application; - } - return new SpringApplication(type); + SpringApplication application = new FunctionalSpringApplication(type); + return application; } - - private class Dummy {} } diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionCreatorConfiguration.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionCreatorConfiguration.java index cfd70a9aa..4eaf821eb 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionCreatorConfiguration.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionCreatorConfiguration.java @@ -51,10 +51,12 @@ import org.springframework.boot.loader.archive.Archive; import org.springframework.boot.loader.archive.ExplodedArchive; import org.springframework.boot.loader.archive.JarFileArchive; import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader; +import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.core.FluxFunction; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; @@ -381,6 +383,32 @@ class FunctionCreatorConfiguration { Object result = null; if (this.runner != null) { result = this.runner.getBean(type); + if (result == null) { + if (this.runner.containsBean(FunctionCatalog.class.getName())) { + Object catalog = this.runner + .getBean(FunctionCatalog.class.getName()); + result = this.runner.evaluate("lookup(#function).getTarget()", + catalog, "function", type); + if (result != null) { + logger.info("Located registration: " + type + " of type " + + result.getClass()); + } + } + } + else { + logger.info("Located bean: " + type + " of type " + + result.getClass()); + if (result.getClass().getName() + .equals(FunctionRegistration.class.getName())) { + result = this.runner.evaluate("getTarget()", result); + } + } + if (result != null) { + if (result.getClass().getName() + .equals(FluxFunction.class.getName())) { + result = this.runner.evaluate("getTarget()", result); + } + } } if (result == null) { logger.info("No bean found. Instantiating: " + type); @@ -390,7 +418,6 @@ class FunctionCreatorConfiguration { } } if (result != null) { - logger.info("Located bean: " + type); return result; } throw new IllegalStateException("Cannot create bean for: " + type); diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionCreatorConfigurationTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionCreatorConfigurationTests.java index 3eaabff38..20a1c6f90 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionCreatorConfigurationTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionCreatorConfigurationTests.java @@ -21,7 +21,6 @@ import java.util.function.Supplier; import org.junit.Assume; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -97,13 +96,45 @@ public abstract class FunctionCreatorConfigurationTests { @EnableAutoConfiguration @TestPropertySource(properties = { "function.location=app:classpath,file:target/test-classes,file:target/test-classes/app", - "function.bean=myDoubler", + "function.bean=doubler", "function.main=org.springframework.cloud.function.test.FunctionRegistrar" }) public static class SingleFunctionWithRegistrarTests extends FunctionCreatorConfigurationTests { @Test - @Ignore // related to boot 2.1 no bean override change + public void testDouble() { + Function, Flux> function = catalog + .lookup(Function.class, "function0"); + assertThat(function.apply(Flux.just(2)).blockFirst()).isEqualTo(4); + } + + } + + @EnableAutoConfiguration + @TestPropertySource(properties = { + "function.location=app:classpath,file:target/test-classes,file:target/test-classes/app", + "function.bean=frenchizer", + "function.main=org.springframework.cloud.function.test.FunctionRegistrar" }) + public static class SingleFunctionWithRegistrarAndRegistrationTests + extends FunctionCreatorConfigurationTests { + + @Test + public void testFrenchize() { + Function, Flux> function = catalog + .lookup(Function.class, "function0"); + assertThat(function.apply(Flux.just(2)).blockFirst()).isEqualTo("deux"); + } + } + + @EnableAutoConfiguration + @TestPropertySource(properties = { + "function.location=app:classpath,file:target/test-classes,file:target/test-classes/app", + "function.bean=myDoubler", + "function.main=org.springframework.cloud.function.test.FunctionInitializer" }) + public static class SingleFunctionWithInitializerTests + extends FunctionCreatorConfigurationTests { + + @Test public void testDouble() { Function, Flux> function = catalog .lookup(Function.class, "function0"); diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionInitializer.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionInitializer.java new file mode 100644 index 000000000..ea5880fab --- /dev/null +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionInitializer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.test; + +import org.springframework.boot.SpringApplication; +import org.springframework.cloud.function.context.FunctionalSpringApplication; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.support.GenericApplicationContext; + +/** + * @author Dave Syer + */ +public class FunctionInitializer + implements ApplicationContextInitializer { + + @Bean + public Doubler myDoubler() { + return new Doubler(); + } + + @Bean + public Frenchizer myFrenchizer() { + return new Frenchizer(); + } + + public static void main(String[] args) throws Exception { + SpringApplication application = new FunctionalSpringApplication( + FunctionInitializer.class); + application.run(args); + } + + @Override + public void initialize(GenericApplicationContext context) { + // TODO: support for FunctionRegistration + context.registerBean("myDoubler", Doubler.class, () -> myDoubler()); + context.registerBean("myFrenchizer", Frenchizer.class, () -> myFrenchizer()); + } +} diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionRegistrar.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionRegistrar.java index 2141ab974..7438174ef 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionRegistrar.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/test/FunctionRegistrar.java @@ -16,10 +16,10 @@ package org.springframework.cloud.function.test; -import java.util.Collections; - import org.springframework.boot.SpringApplication; -import org.springframework.context.ApplicationContext; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.context.FunctionalSpringApplication; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.annotation.Bean; import org.springframework.context.support.GenericApplicationContext; @@ -41,21 +41,21 @@ public class FunctionRegistrar } public static void main(String[] args) throws Exception { - SpringApplication application = new SpringApplication(Object.class) { - @Override - protected void load(ApplicationContext context, Object[] sources) { - } - }; - application.addInitializers(new FunctionRegistrar()); - application.setDefaultProperties( - Collections.singletonMap("spring.functional.enabled", "true")); + SpringApplication application = new FunctionalSpringApplication( + FunctionRegistrar.class); application.run(args); } @Override public void initialize(GenericApplicationContext context) { - // TODO: support for FunctionRegistration - context.registerBean("myDoubler", Doubler.class, () -> myDoubler()); - context.registerBean("myFrenchizer", Frenchizer.class, () -> myFrenchizer()); + context.registerBean("theDoubler", FunctionRegistration.class, + () -> new FunctionRegistration<>(myDoubler(), "doubler") + .type(FunctionType.of((Doubler.class)))); + context.registerBean("frenchizer", FunctionRegistration.class, () -> { + Frenchizer function = myFrenchizer(); + function.init(); + return new FunctionRegistration<>(function, "theFrenchizer") + .type(FunctionType.of((Frenchizer.class))); + }); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index 635bb2c60..4f304a219 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -16,10 +16,13 @@ package org.springframework.cloud.function.web; +import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -38,15 +41,29 @@ import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FluxWrapper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.cloud.function.web.util.HeaderUtils; +import org.springframework.core.MethodParameter; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; +import org.springframework.core.codec.DecodingException; +import org.springframework.core.codec.Hints; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity.BodyBuilder; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.codec.ServerCodecConfigurer; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.messaging.Message; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.ServerWebInputException; +import org.springframework.web.server.UnsupportedMediaTypeStatusException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,11 +82,16 @@ public class RequestProcessor { private final JsonMapper mapper; - public RequestProcessor(FunctionInspector inspector, ObjectProvider mapper, - StringConverter converter) { + private final List> messageReaders; + + public RequestProcessor(FunctionInspector inspector, + ObjectProvider mapper, StringConverter converter, + ObjectProvider codecs) { this.mapper = mapper.getIfAvailable(); this.inspector = inspector; this.converter = converter; + ServerCodecConfigurer source = codecs.getIfAvailable(); + this.messageReaders = source == null ? null : source.getReaders(); } public static FunctionWrapper wrapper( @@ -95,6 +117,12 @@ public class RequestProcessor { } } + public Mono> post(FunctionWrapper wrapper, + ServerWebExchange exchange) { + return Mono.from(body(wrapper.handler(), exchange)) + .flatMap(body -> response(wrapper, body, false)); + } + public Mono> post(FunctionWrapper wrapper, String body, boolean stream) { Object function = wrapper.handler(); @@ -102,12 +130,15 @@ public class RequestProcessor { Type itemType = getItemType(function); Object input = body; - if (StringUtils.hasText(body) && this.mapper!=null) { + if (StringUtils.hasText(body) && this.mapper != null) { if (body.startsWith("[")) { - Class collectionType = Collection.class.isAssignableFrom(inputType) ? inputType : Collection.class; - input = mapper.toObject(body, ResolvableType - .forClassWithGenerics(collectionType, (Class) itemType) - .getType()); + Class collectionType = Collection.class.isAssignableFrom(inputType) + ? inputType + : Collection.class; + input = mapper.toObject(body, + ResolvableType + .forClassWithGenerics(collectionType, (Class) itemType) + .getType()); } else { if (inputType == String.class) { @@ -124,7 +155,7 @@ public class RequestProcessor { } } } - return post(wrapper, input, null, stream); + return response(wrapper, input, stream); } public Mono> stream(FunctionWrapper request) { @@ -134,19 +165,17 @@ public class RequestProcessor { return stream(request, result); } - private Mono> post(FunctionWrapper wrapper, Object body, - MultiValueMap params, boolean stream) { + private Mono> response(FunctionWrapper wrapper, Object body, + boolean stream) { Iterable iterable = body instanceof Collection ? (Collection) body - : (body instanceof Set ? Collections.singleton(body) : Collections.singletonList(body)); + : (body instanceof Set ? Collections.singleton(body) + : Collections.singletonList(body)); Function, Publisher> function = wrapper.function(); Consumer> consumer = wrapper.consumer(); MultiValueMap form = wrapper.params(); - if (params != null) { - form.putAll(params); - } boolean inputIsCollection = Collection.class .isAssignableFrom(inspector.getInputType(wrapper.handler())); @@ -250,6 +279,98 @@ public class RequestProcessor { } } + private Publisher body(Object handler, ServerWebExchange exchange) { + + ResolvableType elementType = ResolvableType + .forClass(this.inspector.getInputType(handler)); + ResolvableType actualType = elementType; + Class resolvedType = elementType.resolve(); + ReactiveAdapter adapter = (resolvedType != null + ? getAdapterRegistry().getAdapter(resolvedType) + : null); + + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + + MediaType contentType = request.getHeaders().getContentType(); + MediaType mediaType = (contentType != null ? contentType + : MediaType.APPLICATION_OCTET_STREAM); + + if (logger.isDebugEnabled()) { + logger.debug(exchange.getLogPrefix() + (contentType != null + ? "Content-Type:" + contentType + : "No Content-Type, using " + MediaType.APPLICATION_OCTET_STREAM)); + } + boolean isBodyRequired = (adapter != null && !adapter.supportsEmpty()); + + MethodParameter bodyParam = new MethodParameter(handlerMethod(handler), 0); + for (HttpMessageReader reader : getMessageReaders()) { + if (reader.canRead(elementType, mediaType)) { + Map readHints = Hints.from(Hints.LOG_PREFIX_HINT, + exchange.getLogPrefix()); + if (adapter != null && adapter.isMultiValue()) { + if (logger.isDebugEnabled()) { + logger.debug( + exchange.getLogPrefix() + "0..N [" + elementType + "]"); + } + Flux flux = reader.read(actualType, elementType, request, response, + readHints); + flux = flux.onErrorResume( + ex -> Flux.error(handleReadError(bodyParam, ex))); + if (isBodyRequired) { + flux = flux.switchIfEmpty( + Flux.error(() -> handleMissingBody(bodyParam))); + } + return Mono.just(adapter.fromPublisher(flux)); + } + else { + // Single-value (with or without reactive type wrapper) + if (logger.isDebugEnabled()) { + logger.debug( + exchange.getLogPrefix() + "0..1 [" + elementType + "]"); + } + Mono mono = reader.readMono(actualType, elementType, request, + response, readHints); + mono = mono.onErrorResume( + ex -> Mono.error(handleReadError(bodyParam, ex))); + if (isBodyRequired) { + mono = mono.switchIfEmpty( + Mono.error(() -> handleMissingBody(bodyParam))); + } + return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) + : Mono.from(mono)); + } + } + } + + return Mono.error(new UnsupportedMediaTypeStatusException(mediaType, + Arrays.asList(MediaType.APPLICATION_JSON), elementType)); + } + + private Method handlerMethod(Object handler) { + return ReflectionUtils.findMethod(handler.getClass(), "apply", (Class[]) null); + } + + public List> getMessageReaders() { + return this.messageReaders; + } + + private Throwable handleReadError(MethodParameter parameter, Throwable ex) { + return (ex instanceof DecodingException + ? new ServerWebInputException("Failed to read HTTP message", parameter, + ex) + : ex); + } + + private ServerWebInputException handleMissingBody(MethodParameter param) { + return new ServerWebInputException( + "Request body is missing: " + param.getExecutable().toGenericString()); + } + + private ReactiveAdapterRegistry getAdapterRegistry() { + return ReactiveAdapterRegistry.getSharedInstance(); + } + private Publisher value(Function, Publisher> function, Publisher value) { Flux input = Flux.from(value).map(body -> converter.convert(function, body)); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 34a32a573..fa8b99599 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -83,6 +83,13 @@ public class FunctionController { return map; } + @PostMapping(path = "/**", consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE) + @ResponseBody + public Mono> post(ServerWebExchange request) { + FunctionWrapper wrapper = wrapper(request); + return processor.post(wrapper, request); + } + @PostMapping(path = "/**") @ResponseBody public Mono> post(ServerWebExchange request, diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index b36f69e5d..c78c470cb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -106,7 +106,8 @@ class FunctionEndpointInitializer context.registerBean(RequestProcessor.class, () -> new RequestProcessor(context.getBean(FunctionInspector.class), context.getBeanProvider(JsonMapper.class), - context.getBean(StringConverter.class))); + context.getBean(StringConverter.class), + context.getBeanProvider(ServerCodecConfigurer.class))); context.registerBean(FunctionEndpointFactory.class, () -> new FunctionEndpointFactory(context.getBean(FunctionCatalog.class), context.getBean(FunctionInspector.class), @@ -226,7 +227,8 @@ class FunctionEndpointFactory { logger.info("Found functions: " + names); if (handler != null) { logger.info("Configured function: " + handler); - Assert.isTrue(names.contains(handler), "Cannot locate function: " + handler); + Assert.isTrue(names.contains(handler), + "Cannot locate function: " + handler); return catalog.lookup(Function.class, handler); } return catalog.lookup(Function.class, names.iterator().next());