diff --git a/spring-cloud-function-deployer/src/it/bootapp/pom.xml b/spring-cloud-function-deployer/src/it/bootapp/pom.xml index 05829a20e..9d3e30a63 100644 --- a/spring-cloud-function-deployer/src/it/bootapp/pom.xml +++ b/spring-cloud-function-deployer/src/it/bootapp/pom.xml @@ -27,6 +27,11 @@ org.springframework.boot spring-boot-starter + + org.springframework + spring-messaging + provided + diff --git a/spring-cloud-function-deployer/src/it/bootapp/src/main/java/function/example/SimpleFunctionAppApplication.java b/spring-cloud-function-deployer/src/it/bootapp/src/main/java/function/example/SimpleFunctionAppApplication.java index ed8f694ef..b2fda7b35 100644 --- a/spring-cloud-function-deployer/src/it/bootapp/src/main/java/function/example/SimpleFunctionAppApplication.java +++ b/spring-cloud-function-deployer/src/it/bootapp/src/main/java/function/example/SimpleFunctionAppApplication.java @@ -5,6 +5,10 @@ import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; @SpringBootApplication public class SimpleFunctionAppApplication { @@ -30,6 +34,24 @@ public class SimpleFunctionAppApplication { }; } + @Bean + public MessageConverter customConverter() { + return new MessageConverter() { + + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + System.out.println("==== In Custom Message Converer: toMessage"); + return null; + } + + @Override + public Object fromMessage(Message message, Class targetClass) { + System.out.println("==== In Custom Message Converer: fromMessage"); + return null; + } + }; + } + public static class Person { diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java index 1f2f588c8..4d2aacc8e 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.deployer; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Type; import java.net.URL; @@ -26,9 +27,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.loader.JarLauncher; import org.springframework.boot.loader.LaunchedURLClassLoader; @@ -41,6 +45,8 @@ import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardTypeLocator; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.CollectionUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils.MethodCallback; @@ -77,6 +83,19 @@ class FunctionArchiveDeployer extends JarLauncher { if (this.isBootApplicationWithMain()) { this.launchFunctionArchive(args); + //===== + Map messageConverters = this.discoverMessageConverters(currentLoader); + if (!CollectionUtils.isEmpty(messageConverters)) { + Field mcField = functionRegistry.getClass().getDeclaredField("messageConverter"); + mcField.setAccessible(true); + CompositeMessageConverter compositMessageConverter = (CompositeMessageConverter) mcField.get(functionRegistry); + List converters = compositMessageConverter.getConverters(); + for (MessageConverter messageConverter : messageConverters.values()) { + converters.add(messageConverter); + } + } + //===== + Map functions = this.discoverBeanFunctions(); if (logger.isInfoEnabled() && !CollectionUtils.isEmpty(functions)) { logger.info("Discovered functions in deployed application context: " + functions); @@ -162,7 +181,8 @@ class FunctionArchiveDeployer extends JarLauncher { private boolean shouldLoadViaDeployerLoader(String name) { return name.startsWith("org.reactivestreams") - || name.startsWith("reactor."); + || name.startsWith("reactor.") + || name.startsWith("org.springframework.messaging"); } private String discoverFunctionClassName(FunctionDeployerProperties functionProperties) { @@ -280,4 +300,38 @@ class FunctionArchiveDeployer extends JarLauncher { } return allFunctions; } + + @SuppressWarnings("unchecked") + private Map discoverMessageConverters(ClassLoader currentLoader) { + ClassLoader threadLoader = Thread.currentThread().getContextClassLoader(); + Map exportedMessageConverters = new HashMap<>(); + try { + if (evalContext.lookupVariable("context") != null) { // no start-class uber jars + Expression parsed = new SpelExpressionParser() + .parseExpression("#context.getBeansOfType(T(org.springframework.messaging.converter.MessageConverter))"); + Thread.currentThread().setContextClassLoader(currentLoader); + Map targetMessageConverters = (Map) parsed.getValue(this.evalContext); + + for (Entry messageConverterEntry : targetMessageConverters.entrySet()) { + + ProxyFactory pf = new ProxyFactory(messageConverterEntry.getValue()); + pf.setInterfaces(MessageConverter.class); + pf.addAdvice(new MethodInterceptor() { + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + System.out.println("=====> Invoking proxy"); + return invocation.proceed(); + } + }); + MessageConverter converter = (MessageConverter) pf.getProxy(); + exportedMessageConverters.put(messageConverterEntry.getKey(), converter); + } + } + } + finally { + Thread.currentThread().setContextClassLoader(threadLoader); + } + + return exportedMessageConverters; + } } diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java index 907645cfe..caa757831 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java @@ -174,6 +174,20 @@ public class FunctionDeployerTests { assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\""); } +// @Test +// public void testWithMainAndStartClassAndSpringConfigurationCustomConverter() throws Exception { +// String[] args = new String[] { +// "--spring.cloud.function.location=target/it/bootapp/target/bootapp-1.0.0.RELEASE-exec.jar", +// "--spring.cloud.function.function-name=uppercase" }; +// ApplicationContext context = SpringApplication.run(DeployerApplication.class, args); +// FunctionCatalog catalog = context.getBean(FunctionCatalog.class); +// Function, Message> function = catalog.lookup("uppercase", "application/json"); +// +// Message result = function +// .apply(MessageBuilder.withPayload("\"bob\"".getBytes(StandardCharsets.UTF_8)).build()); +// assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\""); +// } + @Test public void testWithLegacyProperties() throws Exception { String[] args = new String[] {