Temporary attempt on addressing deploying custon message converters

This commit is contained in:
Oleg Zhurakousky
2019-11-15 13:02:09 +01:00
parent b9cc254eea
commit e75bb67608
4 changed files with 96 additions and 1 deletions

View File

@@ -27,6 +27,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@@ -5,6 +5,10 @@ import java.util.function.Function;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootApplication
public class SimpleFunctionAppApplication {
@@ -30,6 +34,24 @@ public class SimpleFunctionAppApplication {
};
}
@Bean
public MessageConverter customConverter() {
return new MessageConverter() {
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
System.out.println("==== In Custom Message Converer: toMessage");
return null;
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
System.out.println("==== In Custom Message Converer: fromMessage");
return null;
}
};
}
public static class Person {

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.deployer;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.net.URL;
@@ -26,9 +27,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.loader.JarLauncher;
import org.springframework.boot.loader.LaunchedURLClassLoader;
@@ -41,6 +45,8 @@ import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ReflectionUtils.MethodCallback;
@@ -77,6 +83,19 @@ class FunctionArchiveDeployer extends JarLauncher {
if (this.isBootApplicationWithMain()) {
this.launchFunctionArchive(args);
//=====
Map<String, MessageConverter> messageConverters = this.discoverMessageConverters(currentLoader);
if (!CollectionUtils.isEmpty(messageConverters)) {
Field mcField = functionRegistry.getClass().getDeclaredField("messageConverter");
mcField.setAccessible(true);
CompositeMessageConverter compositMessageConverter = (CompositeMessageConverter) mcField.get(functionRegistry);
List<MessageConverter> converters = compositMessageConverter.getConverters();
for (MessageConverter messageConverter : messageConverters.values()) {
converters.add(messageConverter);
}
}
//=====
Map<String, Object> functions = this.discoverBeanFunctions();
if (logger.isInfoEnabled() && !CollectionUtils.isEmpty(functions)) {
logger.info("Discovered functions in deployed application context: " + functions);
@@ -162,7 +181,8 @@ class FunctionArchiveDeployer extends JarLauncher {
private boolean shouldLoadViaDeployerLoader(String name) {
return name.startsWith("org.reactivestreams")
|| name.startsWith("reactor.");
|| name.startsWith("reactor.")
|| name.startsWith("org.springframework.messaging");
}
private String discoverFunctionClassName(FunctionDeployerProperties functionProperties) {
@@ -280,4 +300,38 @@ class FunctionArchiveDeployer extends JarLauncher {
}
return allFunctions;
}
@SuppressWarnings("unchecked")
private Map<String, MessageConverter> discoverMessageConverters(ClassLoader currentLoader) {
ClassLoader threadLoader = Thread.currentThread().getContextClassLoader();
Map<String, MessageConverter> exportedMessageConverters = new HashMap<>();
try {
if (evalContext.lookupVariable("context") != null) { // no start-class uber jars
Expression parsed = new SpelExpressionParser()
.parseExpression("#context.getBeansOfType(T(org.springframework.messaging.converter.MessageConverter))");
Thread.currentThread().setContextClassLoader(currentLoader);
Map<String, Object> targetMessageConverters = (Map<String, Object>) parsed.getValue(this.evalContext);
for (Entry<String, Object> messageConverterEntry : targetMessageConverters.entrySet()) {
ProxyFactory pf = new ProxyFactory(messageConverterEntry.getValue());
pf.setInterfaces(MessageConverter.class);
pf.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
System.out.println("=====> Invoking proxy");
return invocation.proceed();
}
});
MessageConverter converter = (MessageConverter) pf.getProxy();
exportedMessageConverters.put(messageConverterEntry.getKey(), converter);
}
}
}
finally {
Thread.currentThread().setContextClassLoader(threadLoader);
}
return exportedMessageConverters;
}
}

View File

@@ -174,6 +174,20 @@ public class FunctionDeployerTests {
assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\"");
}
// @Test
// public void testWithMainAndStartClassAndSpringConfigurationCustomConverter() throws Exception {
// String[] args = new String[] {
// "--spring.cloud.function.location=target/it/bootapp/target/bootapp-1.0.0.RELEASE-exec.jar",
// "--spring.cloud.function.function-name=uppercase" };
// ApplicationContext context = SpringApplication.run(DeployerApplication.class, args);
// FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
// Function<Message<byte[]>, Message<byte[]>> function = catalog.lookup("uppercase", "application/json");
//
// Message<byte[]> result = function
// .apply(MessageBuilder.withPayload("\"bob\"".getBytes(StandardCharsets.UTF_8)).build());
// assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\"");
// }
@Test
public void testWithLegacyProperties() throws Exception {
String[] args = new String[] {