diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java index 375772b47..64c9c975a 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.context.message; import java.lang.reflect.Method; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.springframework.cloud.function.core.FluxWrapper; @@ -49,6 +50,11 @@ public abstract class MessageUtils { if (handler instanceof FluxWrapper) { handler = ((FluxWrapper) handler).getTarget(); } + if (payload instanceof Message) { + headers = new HashMap<>(headers); + headers.putAll(((Message) payload).getHeaders()); + payload = ((Message) payload).getPayload(); + } if (!(handler instanceof Isolated)) { return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); } diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java index 5bd4a2826..6ebbe69b9 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/ClassLoaderUtils.java @@ -23,13 +23,33 @@ import java.util.ArrayList; import java.util.List; import java.util.jar.JarFile; +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; +import static org.assertj.core.api.Assertions.assertThat; + +import reactor.core.publisher.Flux; + /** * @author Dave Syer * */ public class ClassLoaderUtils { + + @Test + public void fluxIsShared() { + Class flux = ClassUtils.resolveClassName(Flux.class.getName(), createClassLoader()); + assertThat(flux).isEqualTo(Flux.class); + } + + @Test + public void messageIsNotShared() { + Class flux = ClassUtils.resolveClassName(Message.class.getName(), createClassLoader()); + assertThat(flux).isNotEqualTo(Message.class); + } public static ClassLoader createClassLoader() { URL[] urls = findClassPath(); diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessageUtilsTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessageUtilsTests.java new file mode 100644 index 000000000..bd537b4bc --- /dev/null +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/MessageUtilsTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream.function; + +import java.util.Collections; +import java.util.function.Function; + +import org.junit.Test; + +import org.springframework.beans.BeanUtils; +import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.function.core.IsolatedFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.ClassUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +public class MessageUtilsTests { + + private ClassLoader loader = ClassLoaderUtils.createClassLoader(); + + @Test + public void testCreateNotIsolated() throws Exception { + Object function = new Uppercase(); + Object output = MessageUtils.create(function, "foo", Collections.emptyMap()); + assertThat(output).isInstanceOf(Message.class); + } + + @Test + public void testUnpackNotIsolated() throws Exception { + Object function = new Uppercase(); + Object output = MessageUtils.unpack(function, + MessageBuilder.withPayload("foo").build()); + assertThat(output).isInstanceOf(Message.class); + } + + @Test + public void testUnpackNotIsolatedNotMessage() throws Exception { + Object function = new Uppercase(); + Object output = MessageUtils.unpack(function, "foo"); + assertThat(output).isInstanceOf(Message.class); + } + + @Test + public void testUnpackIsolated() throws Exception { + Object function = create(Uppercase.class); + Object output = MessageUtils.unpack(function, message(function, "foo")); + assertThat(output).isInstanceOf(Message.class); + } + + @Test + public void testUnpackIsolatedNotMessage() throws Exception { + Object function = create(Uppercase.class); + Object output = MessageUtils.unpack(function, "foo"); + assertThat(output).isInstanceOf(Message.class); + @SuppressWarnings("unchecked") + Message message = (Message)output; + assertThat(message.getPayload()).isEqualTo("foo"); + } + + @Test + public void testCreateIsolated() throws Exception { + Object function = create(Uppercase.class); + Object output = MessageUtils.create(function, "foo", Collections.emptyMap()); + assertThat(output).isNotInstanceOf(Message.class); + } + + private Object message(Object function, Object payload) { + return MessageUtils.create(function, payload, Collections.emptyMap()); + } + + private Object create(Class type) { + return new IsolatedFunction<>((Function) BeanUtils + .instantiate(ClassUtils.resolveClassName(type.getName(), loader))); + } + + public static class Uppercase implements Function, Message> { + @Override + public Message apply(Message message) { + return MessageBuilder.withPayload(message.getPayload().toUpperCase()) + .copyHeadersIfAbsent(message.getHeaders()).build(); + } + } +}