Support for isolated class loaders extended to cover more functions

Functions with Flux and Message (as well as POJOs and Flux of POJO
which were already supported) should now work if they are created in
an isolated class loader. Preconditions:

* The class loaders must have the reactor-core (and reactive-streams)
shared between the app and the function. Practically speaking this means
there has to be a parent class loader with just reactive types, and
sibling children for the app and the function. This is not a new
requirement (it was needed for Flux of POJO anyway).

* Message types are handled reflectively, so they don't have to be in a
shared class loader. But they do have to be  on the class path on
both sides (obviously).
This commit is contained in:
Dave Syer
2018-02-16 08:16:55 +00:00
parent ccd3953163
commit 1b624c3531
17 changed files with 615 additions and 33 deletions

View File

@@ -457,6 +457,8 @@ public class ContextFunctionCatalogAutoConfiguration {
if (target instanceof Supplier) {
type = Supplier.class;
findType(target, ParamType.OUTPUT);
findType(target, ParamType.OUTPUT_WRAPPER);
isMessage(target);
registration.target(target((Supplier<?>) target, key));
for (String name : registration.getNames()) {
this.suppliers.put(name, (Supplier<?>) registration.getTarget());
@@ -465,6 +467,8 @@ public class ContextFunctionCatalogAutoConfiguration {
else if (target instanceof Consumer) {
type = Consumer.class;
findType(target, ParamType.INPUT);
findType(target, ParamType.INPUT_WRAPPER);
isMessage(target); // cache wrapper types
registration.target(target((Consumer<?>) target, key));
for (String name : registration.getNames()) {
this.consumers.put(name, (Consumer<?>) registration.getTarget());
@@ -474,6 +478,8 @@ public class ContextFunctionCatalogAutoConfiguration {
type = Function.class;
findType(target, ParamType.INPUT);
findType(target, ParamType.OUTPUT);
findType(target, ParamType.INPUT_WRAPPER);
findType(target, ParamType.OUTPUT_WRAPPER);
isMessage(target); // cache wrapper types
registration.target(target((Function<?, ?>) target, key));
for (String name : registration.getNames()) {

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.message;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import org.springframework.cloud.function.core.FluxWrapper;
import org.springframework.cloud.function.core.Isolated;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
/**
* @author Dave Syer
*
*/
public abstract class MessageUtils {
/**
* Create a message for the handler. If the handler is a wrapper for a function in an
* isolated class loader, then the message will be created with the target class
* loader (therefore the {@link Message} class must be on the classpath of the target
* class loader).
*
* @param handler the function that will be applied to the message
* @param payload the payload of the message
* @param headers the headers for the message
* @return a message with the correct class loader
*/
public static Object create(Object handler, Object payload,
Map<String, Object> headers) {
if (handler instanceof FluxWrapper) {
handler = ((FluxWrapper<?>) handler).getTarget();
}
if (!(handler instanceof Isolated)) {
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}
ClassLoader classLoader = ((Isolated) handler).getClassLoader();
Class<?> builder = ClassUtils.resolveClassName(MessageBuilder.class.getName(),
classLoader);
Method withPayload = ClassUtils.getMethod(builder, "withPayload", Object.class);
Method copyHeaders = ClassUtils.getMethod(builder, "copyHeaders", Map.class);
Method build = ClassUtils.getMethod(builder, "build");
Object instance = ReflectionUtils.invokeMethod(withPayload, null, payload);
ReflectionUtils.invokeMethod(copyHeaders, instance, headers);
return ReflectionUtils.invokeMethod(build, instance);
}
/**
* Convert a message from the handler into one that is safe to consume in the caller's
* class laoder. If the handler is a wrapper for a function in an isolated class
* loader, then the message will be created with the target class loader (therefore
* the {@link Message} class must be on the classpath of the target class loader).
*
* @param handler the function that generated the message
* @param message the message to convert
* @return a message with the correct class loader
*/
public static Message<?> unpack(Object handler, Object message) {
if (handler instanceof FluxWrapper) {
handler = ((FluxWrapper<?>) handler).getTarget();
}
if (!(handler instanceof Isolated)) {
if (message instanceof Message) {
return (Message<?>) message;
}
return MessageBuilder.withPayload(message).build();
}
ClassLoader classLoader = ((Isolated) handler).getClassLoader();
Class<?> type = ClassUtils.resolveClassName(Message.class.getName(), classLoader);
Object payload;
Map<String, Object> headers;
if (type.isAssignableFrom(message.getClass())) {
Method getPayload = ClassUtils.getMethod(type, "getPayload");
Method getHeaders = ClassUtils.getMethod(type, "getHeaders");
payload = ReflectionUtils.invokeMethod(getPayload, message);
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) ReflectionUtils
.invokeMethod(getHeaders, message);
headers = map;
} else {
payload = message;
headers = Collections.emptyMap();
}
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}
}

View File

@@ -28,7 +28,7 @@ import reactor.core.publisher.Flux;
*
* @param <T> input type of target consumer
*/
public class FluxConsumer<T> implements Consumer<Flux<T>> {
public class FluxConsumer<T> implements Consumer<Flux<T>>, FluxWrapper<Consumer<T>> {
private final Consumer<T> consumer;
@@ -36,6 +36,11 @@ public class FluxConsumer<T> implements Consumer<Flux<T>> {
this.consumer = consumer;
}
@Override
public Consumer<T> getTarget() {
return this.consumer;
}
@Override
public void accept(Flux<T> input) {
input.subscribe(t -> consumer.accept(t));

View File

@@ -29,13 +29,18 @@ import reactor.core.publisher.Flux;
* @param <T> input type of target function
* @param <R> output type of target function
*/
public class FluxFunction<T, R> implements Function<Flux<T>, Flux<R>> {
public class FluxFunction<T, R> implements Function<Flux<T>, Flux<R>>, FluxWrapper<Function<T, R>> {
private final Function<T, R> function;
public FluxFunction(Function<T, R> function) {
this.function = function;
}
@Override
public Function<T, R> getTarget() {
return this.function;
}
@Override
public Flux<R> apply(Flux<T> input) {

View File

@@ -33,7 +33,7 @@ import reactor.core.publisher.Flux;
*
* @param <T> output type of target supplier
*/
public class FluxSupplier<T> implements Supplier<Flux<T>> {
public class FluxSupplier<T> implements Supplier<Flux<T>>, FluxWrapper<Supplier<T>> {
private final Supplier<T> supplier;
@@ -42,12 +42,17 @@ public class FluxSupplier<T> implements Supplier<Flux<T>> {
public FluxSupplier(Supplier<T> supplier) {
this(supplier, null);
}
public FluxSupplier(Supplier<T> supplier, Duration period) {
this.supplier = supplier;
this.period = period;
}
@Override
public Supplier<T> getTarget() {
return this.supplier;
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<T> get() {

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.core;
/**
* @author Dave Syer
*
*/
public interface FluxWrapper<T> {
T getTarget();
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.core;
/**
* @author Dave Syer
*
*/
public interface Isolated {
ClassLoader getClassLoader();
}

View File

@@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils;
* @author Dave Syer
*
*/
public class IsolatedConsumer<T> implements Consumer<T> {
public class IsolatedConsumer<T> implements Consumer<T>, Isolated {
private final Consumer<T> consumer;
private final ClassLoader classLoader;
public IsolatedConsumer(Consumer<T> consumer) {
this.consumer = consumer;
this.classLoader = consumer.getClass().getClassLoader();
}
@Override
public ClassLoader getClassLoader() {
return this.classLoader;
}
@Override
public void accept(T item) {
ClassLoader context = ClassUtils
.overrideThreadContextClassLoader(consumer.getClass().getClassLoader());
.overrideThreadContextClassLoader(this.classLoader);
try {
consumer.accept(item);
}

View File

@@ -24,20 +24,27 @@ import org.springframework.util.ClassUtils;
* @author Dave Syer
*
*/
public class IsolatedFunction<S, T> implements Function<S, T> {
public class IsolatedFunction<S, T> implements Function<S, T>, Isolated {
private final Function<S, T> function;
private final ClassLoader classLoader;
public IsolatedFunction(Function<S, T> function) {
this.function = function;
this.classLoader = function.getClass().getClassLoader();
}
@Override
public ClassLoader getClassLoader() {
return this.classLoader;
}
@Override
public T apply(S item) {
ClassLoader context = ClassUtils
.overrideThreadContextClassLoader(function.getClass().getClassLoader());
.overrideThreadContextClassLoader(this.classLoader);
try {
return function.apply(item);
return this.function.apply(item);
}
finally {
ClassUtils.overrideThreadContextClassLoader(context);

View File

@@ -24,18 +24,25 @@ import org.springframework.util.ClassUtils;
* @author Dave Syer
*
*/
public class IsolatedSupplier<T> implements Supplier<T> {
public class IsolatedSupplier<T> implements Supplier<T>, Isolated {
private final Supplier<T> supplier;
private final ClassLoader classLoader;
public IsolatedSupplier(Supplier<T> supplier) {
this.supplier = supplier;
this.classLoader = supplier.getClass().getClassLoader();
}
@Override
public ClassLoader getClassLoader() {
return this.classLoader;
}
@Override
public T get() {
ClassLoader context = ClassUtils
.overrideThreadContextClassLoader(supplier.getClass().getClassLoader());
.overrideThreadContextClassLoader(this.classLoader);
try {
return supplier.get();
}

View File

@@ -27,6 +27,7 @@ import java.util.function.Function;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
@@ -91,6 +92,9 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
return flux.publish(values -> {
Flux<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));
if (this.functionInspector.isMessage(function)) {
result = result.map(message -> MessageUtils.unpack(function, message));
}
Flux<Map<String, Object>> aggregate = headers(values);
return result.withLatestFrom(aggregate, (p, m) -> message(p, m));
});
@@ -141,7 +145,7 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
.get(StreamConfigurationProperties.ROUTE_KEY);
name = stash(key);
}
if (name==null && defaultRoute != null) {
if (name == null && defaultRoute != null) {
name = stash(defaultRoute);
}
if (name == null) {
@@ -155,10 +159,10 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
else {
for (String candidate : names) {
Object function = functionCatalog.lookupFunction(candidate);
if (function==null) {
if (function == null) {
function = functionCatalog.lookupConsumer(candidate);
}
if (function==null) {
if (function == null) {
continue;
}
Class<?> inputType = functionInspector.getInputType(function);
@@ -202,8 +206,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
Class<?> inputType = functionInspector.getInputType(function);
return m -> {
if (functionInspector.isMessage(function)) {
return MessageBuilder.withPayload(convertPayload(inputType, m))
.copyHeaders(m.getHeaders()).build();
return MessageUtils.create(function, convertPayload(inputType, m),
m.getHeaders());
}
else {
return convertPayload(inputType, m);

View File

@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.endpoint.MessageProducerSupport;
@@ -86,7 +87,8 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
if (supplier != null) {
suppliers.add(name);
disposables.put(name,
supplier.get().subscribeOn(Schedulers.elastic()).subscribe(m -> send(name, m)));
supplier.get().subscribeOn(Schedulers.elastic())
.subscribe(m -> send(name, m)));
}
}
}
@@ -94,16 +96,10 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
}
private void send(String name, Object payload) {
Message<?> message;
if (payload instanceof Message) {
message = MessageBuilder.fromMessage((Message<?>) payload)
.setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name)
.build();
}
else {
message = MessageBuilder.withPayload(payload)
.setHeader(StreamConfigurationProperties.ROUTE_KEY, name).build();
}
Supplier<Flux<?>> supplier = functionCatalog.lookupSupplier(name);
Message<?> message = MessageUtils.unpack(supplier, payload);
message = MessageBuilder.fromMessage(message)
.setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name).build();
getOutputChannel().send(message);
}

View File

@@ -0,0 +1,154 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.jar.JarFile;
import org.springframework.util.StringUtils;
/**
* @author Dave Syer
*
*/
public class ClassLoaderUtils {
public static ClassLoader createClassLoader() {
URL[] urls = findClassPath();
if (urls.length == 1) {
URL[] classpath = extractClasspath(urls[0]);
if (classpath != null) {
urls = classpath;
}
}
List<URL> child = new ArrayList<>();
for (URL url : urls) {
child.add(url);
}
for (URL url : urls) {
if (isRoot(StringUtils.getFilename(clean(url.toString())))) {
child.remove(url);
}
}
ClassLoader base = ClassLoaderUtils.class.getClassLoader();
return new ParentLastURLClassLoader(child.toArray(new URL[0]), base);
}
private static URL[] extractClasspath(URL url) {
// This works for a jar indirection like in surefire and IntelliJ
if (url.toString().endsWith(".jar")) {
JarFile jar;
try {
jar = new JarFile(new File(url.toURI()));
String path = jar.getManifest().getMainAttributes()
.getValue("Class-Path");
if (path != null) {
List<URL> result = new ArrayList<>();
for (String element : path.split(" ")) {
result.add(new URL(element));
}
return result.toArray(new URL[0]);
}
}
catch (Exception e) {
}
}
return null;
}
private static String clean(String jar) {
// This works with fat jars like Spring Boot where the path elements look like
// jar:file:...something.jar!/.
return jar.endsWith("!/") ? jar.substring(0, jar.length() - 2) : jar;
}
private static URL[] findClassPath() {
return ((URLClassLoader) ClassLoaderUtils.class.getClassLoader()).getURLs();
}
private static boolean isRoot(String file) {
return file.startsWith("reactor-core") || file.startsWith("reactive-streams");
}
private static class ParentLastURLClassLoader extends ClassLoader {
private ChildURLClassLoader childClassLoader;
/**
* This class allows me to call findClass on a classloader
*/
private static class FindClassClassLoader extends ClassLoader {
public FindClassClassLoader(ClassLoader parent) {
super(parent);
}
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
return super.findClass(name);
}
}
/**
* This class delegates (child then parent) for the findClass method for a
* URLClassLoader. We need this because findClass is protected in URLClassLoader
*/
private static class ChildURLClassLoader extends URLClassLoader {
private FindClassClassLoader realParent;
public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
super(urls, null);
this.realParent = realParent;
}
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
try {
// first try to use the URLClassLoader findClass
return super.findClass(name);
}
catch (ClassNotFoundException e) {
// if that fails, we ask our real parent classloader to load the class
// (we give up)
return realParent.loadClass(name);
}
}
}
public ParentLastURLClassLoader(URL[] urls, ClassLoader parent) {
super(parent);
childClassLoader = new ChildURLClassLoader(urls,
new FindClassClassLoader(this.getParent()));
}
@Override
protected synchronized Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException {
try {
// first we try to find a class inside the child classloader
return childClassLoader.findClass(name);
}
catch (ClassNotFoundException e) {
// didn't find it, try the parent
return super.loadClass(name, resolve);
}
}
}
}

View File

@@ -0,0 +1,115 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.ClassUtils;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = IsolatedFluxMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class)
public class IsolatedFluxMessagePojoStreamingFunctionTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input().send(
MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result).isInstanceOf(Message.class);
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Autowired
private FunctionRegistry registry;
@PostConstruct
public void register() {
// TODO: this class loader doesn't really test the isolation properly. Not
// sure why, but if you remove the reflection in MessageUtils the test is
// still green.
ClassLoader loader = ClassLoaderUtils.createClassLoader();
Class<?> type = ClassUtils.resolveClassName(Uppercase.class.getName(),
loader);
registry.register(
new FunctionRegistration<Object>(BeanUtils.instantiate(type))
.name("uppercase"));
}
}
public static class Uppercase
implements Function<Flux<Message<Foo>>, Flux<Message<Foo>>> {
@Override
public Flux<Message<Foo>> apply(Flux<Message<Foo>> flux) {
return flux.map(message -> MessageBuilder
.withPayload(new Foo(message.getPayload().getName().toUpperCase()))
.build());
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.function;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.ClassUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = IsolatedMessagePojoStreamingFunctionTests.StreamingFunctionApplication.class)
public class IsolatedMessagePojoStreamingFunctionTests {
@Autowired
Processor processor;
@Autowired
MessageCollector messageCollector;
@Test
public void test() throws Exception {
processor.input().send(
MessageBuilder.withPayload(new String("{\"name\":\"foo\"}")).build());
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result.getPayload().getClass().getName())
.isEqualTo(Foo.class.getName());
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Autowired
private FunctionRegistry registry;
@PostConstruct
public void register() {
ClassLoader loader = ClassLoaderUtils.createClassLoader();
Class<?> type = ClassUtils.resolveClassName(Uppercase.class.getName(),
loader);
registry.register(
new FunctionRegistration<Object>(BeanUtils.instantiate(type))
.name("uppercase"));
}
}
public static class Uppercase implements Function<Message<Foo>, Message<Foo>> {
@Override
public Message<Foo> apply(Message<Foo> flux) {
return MessageBuilder
.withPayload(new Foo(flux.getPayload().getName().toUpperCase()))
.build();
}
}
protected static class Foo {
private String name;
Foo() {
}
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.web.flux.request.FluxRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -72,6 +73,9 @@ public class FunctionController {
flux = flux.log();
}
Flux<?> result = Flux.from(function.apply(flux));
if (inspector.isMessage(function)) {
result = result.map(message -> MessageUtils.unpack(function, message));
}
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with function");
}

View File

@@ -30,13 +30,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StreamUtils;
import org.springframework.web.bind.support.WebDataBinderFactory;
import org.springframework.web.context.request.NativeWebRequest;
@@ -106,12 +107,10 @@ public class FluxHandlerMethodArgumentResolver
}
if (message) {
List<Object> messages = new ArrayList<>();
MessageHeaders headers = HeaderUtils.fromHttp(new ServletServerHttpRequest(
webRequest.getNativeRequest(HttpServletRequest.class)).getHeaders());
for (Object payload : body) {
messages.add(MessageBuilder.withPayload(payload)
.copyHeaders(HeaderUtils.fromHttp(new ServletServerHttpRequest(
webRequest.getNativeRequest(HttpServletRequest.class))
.getHeaders()))
.build());
messages.add(MessageUtils.create(handler, payload, headers));
}
body = messages;
}