diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle
index 37e85a3d6e..ada7330be6 100644
--- a/spring-messaging/spring-messaging.gradle
+++ b/spring-messaging/spring-messaging.gradle
@@ -25,6 +25,7 @@ dependencies {
}
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
testCompile("io.projectreactor:reactor-test")
+ testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
testCompile("org.xmlunit:xmlunit-matchers:2.6.2")
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
new file mode 100644
index 0000000000..6aad436572
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.messaging.handler.annotation.support.reactive;
+
+import java.lang.annotation.Annotation;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.Conventions;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ReactiveAdapter;
+import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.ResolvableType;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.core.codec.Decoder;
+import org.springframework.core.codec.DecodingException;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
+import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
+import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.MimeType;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.StringUtils;
+import org.springframework.validation.BeanPropertyBindingResult;
+import org.springframework.validation.SmartValidator;
+import org.springframework.validation.Validator;
+import org.springframework.validation.annotation.Validated;
+
+/**
+ * A resolver to extract and decode the payload of a message using a
+ * {@link Decoder}, where the payload is expected to be a {@link Publisher} of
+ * {@link DataBuffer DataBuffer}.
+ *
+ *
Validation is applied if the method argument is annotated with
+ * {@code @javax.validation.Valid} or
+ * {@link org.springframework.validation.annotation.Validated}. Validation
+ * failure results in an {@link MethodArgumentNotValidException}.
+ *
+ *
This resolver should be ordered last if {@link #useDefaultResolution} is
+ * set to {@code true} since in that case it supports all types and does not
+ * require the presence of {@link Payload}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.2
+ */
+public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResolver {
+
+ protected final Log logger = LogFactory.getLog(getClass());
+
+
+ private final List> decoders;
+
+ @Nullable
+ private final Validator validator;
+
+ private final ReactiveAdapterRegistry adapterRegistry;
+
+ private final boolean useDefaultResolution;
+
+
+ public PayloadMethodArgumentResolver(List extends Decoder>> decoders, @Nullable Validator validator,
+ @Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) {
+
+ Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required.");
+ this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
+ this.validator = validator;
+ this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance();
+ this.useDefaultResolution = useDefaultResolution;
+ }
+
+
+ /**
+ * Return a read-only list of the configured decoders.
+ */
+ public List> getDecoders() {
+ return this.decoders;
+ }
+
+ /**
+ * Return the configured validator, if any.
+ */
+ @Nullable
+ public Validator getValidator() {
+ return this.validator;
+ }
+
+ /**
+ * Return the configured {@link ReactiveAdapterRegistry}.
+ */
+ public ReactiveAdapterRegistry getAdapterRegistry() {
+ return this.adapterRegistry;
+ }
+
+ /**
+ * Whether this resolver is configured to use default resolution, i.e.
+ * works for any argument type regardless of whether {@code @Payload} is
+ * present or not.
+ */
+ public boolean isUseDefaultResolution() {
+ return this.useDefaultResolution;
+ }
+
+
+ @Override
+ public boolean supportsParameter(MethodParameter parameter) {
+ return parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution;
+ }
+
+
+ /**
+ * Decode the content of the given message payload through a compatible
+ * {@link Decoder}.
+ *
+ * Validation is applied if the method argument is annotated with
+ * {@code @javax.validation.Valid} or
+ * {@link org.springframework.validation.annotation.Validated}. Validation
+ * failure results in an {@link MethodArgumentNotValidException}.
+ *
+ * @param parameter the target method argument that we are decoding to
+ * @param message the message from which the content was extracted
+ * @return a Mono with the result of argument resolution
+ *
+ * @see #extractPayloadContent(MethodParameter, Message)
+ * @see #getMimeType(Message)
+ */
+ @Override
+ public final Mono resolveArgument(MethodParameter parameter, Message> message) {
+ Payload ann = parameter.getParameterAnnotation(Payload.class);
+ if (ann != null && StringUtils.hasText(ann.expression())) {
+ throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
+ }
+ Publisher content = extractPayloadContent(parameter, message);
+ return decodeContent(parameter, message, ann == null || ann.required(), content, getMimeType(message));
+ }
+
+ /**
+ * Extract the content to decode from the message. By default, the message
+ * payload is expected to be {@code Publisher}. Sub-classes can
+ * override this method to change that assumption.
+ * @param parameter the target method parameter we're decoding to
+ * @param message the input message with the content
+ * @return the content to decode
+ */
+ @SuppressWarnings("unchecked")
+ protected Publisher extractPayloadContent(MethodParameter parameter, Message> message) {
+ Publisher content;
+ try {
+ content = (Publisher) message.getPayload();
+ }
+ catch (ClassCastException ex) {
+ throw new MethodArgumentResolutionException(
+ message, parameter, "Expected Publisher payload", ex);
+ }
+ return content;
+ }
+
+ /**
+ * Return the mime type for the content. By default this method checks the
+ * {@link MessageHeaders#CONTENT_TYPE} header expecting to find a
+ * {@link MimeType} value or a String to parse to a {@link MimeType}.
+ * @param message the input message
+ */
+ @Nullable
+ protected MimeType getMimeType(Message> message) {
+ Object headerValue = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
+ if (headerValue == null) {
+ return null;
+ }
+ else if (headerValue instanceof String) {
+ return MimeTypeUtils.parseMimeType((String) headerValue);
+ }
+ else if (headerValue instanceof MimeType) {
+ return (MimeType) headerValue;
+ }
+ else {
+ throw new IllegalArgumentException("Unexpected MimeType value: " + headerValue);
+ }
+ }
+
+ private Mono decodeContent(MethodParameter parameter, Message> message,
+ boolean isContentRequired, Publisher content, @Nullable MimeType mimeType) {
+
+ ResolvableType targetType = ResolvableType.forMethodParameter(parameter);
+ Class> resolvedType = targetType.resolve();
+ ReactiveAdapter adapter = (resolvedType != null ? getAdapterRegistry().getAdapter(resolvedType) : null);
+ ResolvableType elementType = (adapter != null ? targetType.getGeneric() : targetType);
+ isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty());
+ Consumer validator = getValidator(message, parameter);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mime type:" + mimeType);
+ }
+ mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
+
+ for (Decoder> decoder : this.decoders) {
+ if (decoder.canDecode(elementType, mimeType)) {
+ if (adapter != null && adapter.isMultiValue()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("0..N [" + elementType + "]");
+ }
+ Flux> flux = decoder.decode(content, elementType, mimeType, Collections.emptyMap());
+ flux = flux.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
+ if (isContentRequired) {
+ flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
+ }
+ if (validator != null) {
+ flux = flux.doOnNext(validator::accept);
+ }
+ return Mono.just(adapter.fromPublisher(flux));
+ }
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("0..1 [" + elementType + "]");
+ }
+ // Single-value (with or without reactive type wrapper)
+ Mono> mono = decoder.decodeToMono(content, targetType, mimeType, Collections.emptyMap());
+ mono = mono.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
+ if (isContentRequired) {
+ mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
+ }
+ if (validator != null) {
+ mono = mono.doOnNext(validator::accept);
+ }
+ return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono));
+ }
+ }
+ }
+
+ return Mono.error(new MethodArgumentResolutionException(
+ message, parameter, "Cannot decode to [" + targetType + "]" + message));
+ }
+
+ private Throwable handleReadError(MethodParameter parameter, Message> message, Throwable ex) {
+ return ex instanceof DecodingException ?
+ new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex;
+ }
+
+ private MethodArgumentResolutionException handleMissingBody(MethodParameter param, Message> message) {
+ return new MethodArgumentResolutionException(message, param,
+ "Payload content is missing: " + param.getExecutable().toGenericString());
+ }
+
+ @Nullable
+ private Consumer getValidator(Message> message, MethodParameter parameter) {
+ if (this.validator == null) {
+ return null;
+ }
+ for (Annotation ann : parameter.getParameterAnnotations()) {
+ Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
+ if (validatedAnn != null || ann.annotationType().getSimpleName().startsWith("Valid")) {
+ Object hints = (validatedAnn != null ? validatedAnn.value() : AnnotationUtils.getValue(ann));
+ Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints});
+ String name = Conventions.getVariableNameForParameter(parameter);
+ return target -> {
+ BeanPropertyBindingResult bindingResult = new BeanPropertyBindingResult(target, name);
+ if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) {
+ ((SmartValidator) this.validator).validate(target, bindingResult, validationHints);
+ }
+ else {
+ this.validator.validate(target, bindingResult);
+ }
+ if (bindingResult.hasErrors()) {
+ throw new MethodArgumentNotValidException(message, parameter, bindingResult);
+ }
+ };
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/package-info.java
new file mode 100644
index 0000000000..41f18a41cd
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/package-info.java
@@ -0,0 +1,10 @@
+/**
+ * Support classes for working with annotated message-handling methods with
+ * non-blocking, reactive contracts.
+ */
+@NonNullApi
+@NonNullFields
+package org.springframework.messaging.handler.annotation.support.reactive;
+
+import org.springframework.lang.NonNullApi;
+import org.springframework.lang.NonNullFields;
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/MethodArgumentResolutionException.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/MethodArgumentResolutionException.java
index 0d03ea2c98..a8302bd48b 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/MethodArgumentResolutionException.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/MethodArgumentResolutionException.java
@@ -17,6 +17,7 @@
package org.springframework.messaging.handler.invocation;
import org.springframework.core.MethodParameter;
+import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
@@ -51,6 +52,17 @@ public class MethodArgumentResolutionException extends MessagingException {
this.parameter = parameter;
}
+ /**
+ * Create a new instance providing the invalid {@code MethodParameter},
+ * prepared description, and a cause.
+ */
+ public MethodArgumentResolutionException(
+ Message> message, MethodParameter parameter, String description, @Nullable Throwable cause) {
+
+ super(message, getMethodParameterMessage(parameter) + ": " + description, cause);
+ this.parameter = parameter;
+ }
+
/**
* Return the MethodParameter that was rejected.
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java
new file mode 100644
index 0000000000..d7a3aa96b4
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.messaging.handler.invocation.reactive;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ReactiveAdapter;
+import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.ResolvableType;
+import org.springframework.core.codec.Encoder;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+import org.springframework.util.Assert;
+
+/**
+ * Base class for a return value handler that encodes the return value, possibly
+ * a {@link Publisher} of values, to a {@code Flux} through a
+ * compatible {@link Encoder}.
+ *
+ * Sub-classes must implement the abstract method
+ * {@link #handleEncodedContent} to do something with the resulting encoded
+ * content.
+ *
+ *
This handler should be ordered last since its {@link #supportsReturnType}
+ * returns {@code true} for any method parameter type.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.2
+ */
+public abstract class AbstractEncoderMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
+
+ private static final ResolvableType VOID_RESOLVABLE_TYPE = ResolvableType.forClass(Void.class);
+
+ private static final ResolvableType OBJECT_RESOLVABLE_TYPE = ResolvableType.forClass(Object.class);
+
+
+ protected final Log logger = LogFactory.getLog(getClass());
+
+
+ private final List> encoders;
+
+ private final ReactiveAdapterRegistry adapterRegistry;
+
+ // TODO: configure or passed via MessageHeaders
+ private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+
+
+ protected AbstractEncoderMethodReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
+ Assert.notEmpty(encoders, "At least one Encoder is required");
+ Assert.notNull(registry, "ReactiveAdapterRegistry is required");
+ this.encoders = Collections.unmodifiableList(encoders);
+ this.adapterRegistry = registry;
+ }
+
+
+ /**
+ * The configured encoders.
+ */
+ public List> getEncoders() {
+ return this.encoders;
+ }
+
+ /**
+ * The configured adapter registry.
+ */
+ public ReactiveAdapterRegistry getAdapterRegistry() {
+ return this.adapterRegistry;
+ }
+
+
+ @Override
+ public boolean supportsReturnType(MethodParameter returnType) {
+ return true;
+ }
+
+ @Override
+ public Mono handleReturnValue(Object returnValue, MethodParameter returnType, Message> message) {
+ Flux encodedContent = encodeContent(returnValue, returnType, this.bufferFactory);
+ return handleEncodedContent(encodedContent, returnType, message);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Flux encodeContent(@Nullable Object content, MethodParameter returnType,
+ DataBufferFactory bufferFactory) {
+
+ ResolvableType bodyType = ResolvableType.forMethodParameter(returnType);
+ ReactiveAdapter adapter = getAdapterRegistry().getAdapter(bodyType.resolve(), content);
+
+ Publisher> publisher;
+ ResolvableType elementType;
+ if (adapter != null) {
+ publisher = adapter.toPublisher(content);
+ ResolvableType genericType = bodyType.getGeneric();
+ elementType = getElementType(adapter, genericType);
+ }
+ else {
+ publisher = Mono.justOrEmpty(content);
+ elementType = (bodyType.toClass() == Object.class && content != null ?
+ ResolvableType.forInstance(content) : bodyType);
+ }
+
+ if (elementType.resolve() == void.class || elementType.resolve() == Void.class) {
+ return Flux.from(publisher).cast(DataBuffer.class);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug((publisher instanceof Mono ? "0..1" : "0..N") + " [" + elementType + "]");
+ }
+
+ for (Encoder> encoder : getEncoders()) {
+ if (encoder.canEncode(elementType, null)) {
+ Map hints = Collections.emptyMap();
+ return encoder.encode((Publisher) publisher, bufferFactory, elementType, null, hints);
+ }
+ }
+
+ return Flux.error(new MessagingException("No encoder for " + returnType));
+ }
+
+ private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType genericType) {
+ if (adapter.isNoValue()) {
+ return VOID_RESOLVABLE_TYPE;
+ }
+ else if (genericType != ResolvableType.NONE) {
+ return genericType;
+ }
+ else {
+ return OBJECT_RESOLVABLE_TYPE;
+ }
+ }
+
+ /**
+ * Handle the encoded content in some way, e.g. wrapping it in a message and
+ * passing it on for further processing.
+ * @param encodedContent the result of data encoding
+ * @param returnType return type of the handler method that produced the data
+ * @param message the input message handled by the handler method
+ * @return completion {@code Mono} for the handling
+ */
+ protected abstract Mono handleEncodedContent(
+ Flux encodedContent, MethodParameter returnType, Message> message);
+
+}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java
new file mode 100644
index 0000000000..0797440d06
--- /dev/null
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.messaging.handler.annotation.support.reactive;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ResolvableType;
+import org.springframework.core.codec.Decoder;
+import org.springframework.core.codec.StringDecoder;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
+import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
+import org.springframework.messaging.handler.invocation.ResolvableMethod;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
+import org.springframework.validation.annotation.Validated;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Unit tests for {@link PayloadMethodArgumentResolver}.
+ *
+ * @author Rossen Stoyanchev
+ */
+public class PayloadMethodArgumentResolverTests {
+
+ private final List> decoders = new ArrayList<>();
+
+ private final ResolvableMethod testMethod = ResolvableMethod.on(getClass()).named("handle").build();
+
+
+ @Test
+ public void supportsParameter() {
+
+ boolean useDefaultResolution = true;
+ PayloadMethodArgumentResolver resolver = createResolver(null, useDefaultResolution);
+
+ assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg()));
+ assertTrue(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class)));
+
+ useDefaultResolution = false;
+ resolver = createResolver(null, useDefaultResolution);
+
+ assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg()));
+ assertFalse(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class)));
+ }
+
+ @Test
+ public void emptyBodyWhenRequired() {
+ MethodParameter param = this.testMethod.arg(ResolvableType.forClassWithGenerics(Mono.class, String.class));
+ Mono mono = resolveValue(param, Mono.empty(), null);
+
+ StepVerifier.create(mono)
+ .consumeErrorWith(ex -> {
+ assertEquals(MethodArgumentResolutionException.class, ex.getClass());
+ assertTrue(ex.getMessage(), ex.getMessage().contains("Payload content is missing"));
+ })
+ .verify();
+ }
+
+ @Test
+ public void emptyBodyWhenNotRequired() {
+ MethodParameter param = this.testMethod.annotPresent(Payload.class).arg();
+ assertNull(resolveValue(param, Mono.empty(), null));
+ }
+
+ @Test
+ public void stringMono() {
+ String body = "foo";
+ MethodParameter param = this.testMethod.arg(ResolvableType.forClassWithGenerics(Mono.class, String.class));
+ Mono value = Mono.delay(Duration.ofMillis(10)).map(aLong -> toDataBuffer(body));
+ Mono mono = resolveValue(param, value, null);
+
+ assertEquals(body, mono.block());
+ }
+
+ @Test
+ public void stringFlux() {
+ List body = Arrays.asList("foo", "bar");
+ ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class);
+ MethodParameter param = this.testMethod.arg(type);
+ Flux flux = resolveValue(param, Flux.fromIterable(body)
+ .delayElements(Duration.ofMillis(10)).map(value -> toDataBuffer(value + "\n")), null);
+
+ assertEquals(body, flux.collectList().block());
+ }
+
+ @Test
+ public void string() {
+ String body = "foo";
+ MethodParameter param = this.testMethod.annotNotPresent(Payload.class).arg(String.class);
+ Object value = resolveValue(param, Mono.just(toDataBuffer(body)), null);
+
+ assertEquals(body, value);
+ }
+
+ @Test
+ public void validateStringMono() {
+ ResolvableType type = ResolvableType.forClassWithGenerics(Mono.class, String.class);
+ MethodParameter param = this.testMethod.arg(type);
+ Mono mono = resolveValue(param, Mono.just(toDataBuffer("12345")), new TestValidator());
+
+ StepVerifier.create(mono).expectNextCount(0)
+ .expectError(MethodArgumentNotValidException.class).verify();
+ }
+
+ @Test
+ public void validateStringFlux() {
+ ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class);
+ MethodParameter param = this.testMethod.arg(type);
+ Flux flux = resolveValue(param, Flux.just(toDataBuffer("12345678\n12345")), new TestValidator());
+
+ StepVerifier.create(flux)
+ .expectNext("12345678")
+ .expectError(MethodArgumentNotValidException.class)
+ .verify();
+ }
+
+
+ private DataBuffer toDataBuffer(String value) {
+ return new DefaultDataBufferFactory().wrap(value.getBytes(StandardCharsets.UTF_8));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ private T resolveValue(MethodParameter param, Publisher content, Validator validator) {
+
+ Message> message = new GenericMessage<>(content,
+ Collections.singletonMap(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN));
+
+ Mono result = createResolver(validator, true).resolveArgument(param, message);
+
+ Object value = result.block(Duration.ofSeconds(5));
+ if (value != null) {
+ Class> expectedType = param.getParameterType();
+ assertTrue("Unexpected return value type: " + value, expectedType.isAssignableFrom(value.getClass()));
+ }
+ return (T) value;
+ }
+
+ private PayloadMethodArgumentResolver createResolver(@Nullable Validator validator, boolean useDefaultResolution) {
+ if (this.decoders.isEmpty()) {
+ this.decoders.add(StringDecoder.allMimeTypes());
+ }
+ List decoders = Collections.singletonList(StringDecoder.allMimeTypes());
+ return new PayloadMethodArgumentResolver(decoders, validator, null, useDefaultResolution) {};
+ }
+
+
+ @SuppressWarnings("unused")
+ private void handle(
+ @Validated Mono valueMono,
+ @Validated Flux valueFlux,
+ @Payload(required = false) String optionalValue,
+ String value) {
+ }
+
+
+ private static class TestValidator implements Validator {
+
+ @Override
+ public boolean supports(Class> clazz) {
+ return clazz.equals(String.class);
+ }
+
+ @Override
+ public void validate(@Nullable Object target, Errors errors) {
+ if (target instanceof String && ((String) target).length() < 8) {
+ errors.reject("Invalid length");
+ }
+ }
+ }
+
+}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethodTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethodTests.java
index fbb0350d1b..e0c44543e6 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethodTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethodTests.java
@@ -23,7 +23,6 @@ import org.junit.Test;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
-import org.springframework.messaging.handler.ResolvableMethod;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/MethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/MethodMessageHandlerTests.java
index 54d6830be1..7dce680ee4 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/MethodMessageHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/MethodMessageHandlerTests.java
@@ -166,7 +166,7 @@ public class MethodMessageHandlerTests {
this.method = "secondBestMatch";
}
- public void illegalStateException(IllegalStateException exception) {
+ public void handleIllegalStateException(IllegalStateException exception) {
this.method = "illegalStateException";
this.arguments.put("exception", exception);
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/ResolvableMethod.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/ResolvableMethod.java
similarity index 97%
rename from spring-messaging/src/test/java/org/springframework/messaging/handler/ResolvableMethod.java
rename to spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/ResolvableMethod.java
index 7f4a367370..dd360bd6e0 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/ResolvableMethod.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/ResolvableMethod.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.springframework.messaging.handler;
+package org.springframework.messaging.handler.invocation;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
@@ -57,7 +57,10 @@ import org.springframework.util.ReflectionUtils;
import static java.util.stream.Collectors.*;
/**
- * Convenience class to resolve method parameters from hints.
+ * NOTE: This class is a replica of the same class in spring-web so it can
+ * be used for tests in spring-messaging.
+ *
+ * Convenience class to resolve method parameters from hints.
*
*
Background
*
@@ -120,7 +123,7 @@ import static java.util.stream.Collectors.*;
*
*
* @author Rossen Stoyanchev
- * @since 5.0
+ * @since 5.2
*/
public class ResolvableMethod {
@@ -186,6 +189,7 @@ public class ResolvableMethod {
/**
* Filter on method arguments with annotation.
+ * See {@link org.springframework.web.method.MvcAnnotationPredicates}.
*/
@SafeVarargs
public final ArgResolver annot(Predicate... filter) {
@@ -298,6 +302,7 @@ public class ResolvableMethod {
/**
* Filter on annotated methods.
+ * See {@link org.springframework.web.method.MvcAnnotationPredicates}.
*/
@SafeVarargs
public final Builder annot(Predicate... filters) {
@@ -308,6 +313,7 @@ public class ResolvableMethod {
/**
* Filter on methods annotated with the given annotation type.
* @see #annot(Predicate[])
+ * See {@link org.springframework.web.method.MvcAnnotationPredicates}.
*/
@SafeVarargs
public final Builder annotPresent(Class extends Annotation>... annotationTypes) {
@@ -524,6 +530,7 @@ public class ResolvableMethod {
/**
* Filter on method arguments with annotations.
+ * See {@link org.springframework.web.method.MvcAnnotationPredicates}.
*/
@SafeVarargs
public final ArgResolver annot(Predicate... filters) {
@@ -535,6 +542,7 @@ public class ResolvableMethod {
* Filter on method arguments that have the given annotations.
* @param annotationTypes the annotation types
* @see #annot(Predicate[])
+ * See {@link org.springframework.web.method.MvcAnnotationPredicates}.
*/
@SafeVarargs
public final ArgResolver annotPresent(Class extends Annotation>... annotationTypes) {
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
new file mode 100644
index 0000000000..b373d671c0
--- /dev/null
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.messaging.handler.invocation.reactive;
+
+import java.util.Collections;
+import java.util.List;
+
+import io.reactivex.Completable;
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.codec.CharSequenceEncoder;
+import org.springframework.core.codec.Encoder;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.support.DataBufferTestUtils;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+
+import static java.nio.charset.StandardCharsets.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import static org.springframework.messaging.handler.invocation.ResolvableMethod.*;
+
+/**
+ * Unit tests for {@link AbstractEncoderMethodReturnValueHandler}.
+ *
+ * @author Rossen Stoyanchev
+ */
+public class EncoderMethodReturnValueHandlerTests {
+
+ private final TestEncoderMethodReturnValueHandler handler = new TestEncoderMethodReturnValueHandler(
+ Collections.singletonList(CharSequenceEncoder.textPlainOnly()),
+ ReactiveAdapterRegistry.getSharedInstance());
+
+ private final Message> message = mock(Message.class);
+
+
+ @Test
+ public void stringReturnValue() {
+ MethodParameter parameter = on(TestController.class).resolveReturnType(String.class);
+ this.handler.handleReturnValue("foo", parameter, message).block();
+ Flux result = this.handler.encodedContent;
+
+ StepVerifier.create(result)
+ .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
+ .verifyComplete();
+ }
+
+ @Test
+ public void objectReturnValue() {
+ MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class);
+ this.handler.handleReturnValue("foo", parameter, message).block();
+ Flux result = this.handler.encodedContent;
+
+ StepVerifier.create(result)
+ .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
+ .verifyComplete();
+ }
+
+ @Test
+ public void fluxStringReturnValue() {
+ MethodParameter parameter = on(TestController.class).resolveReturnType(Flux.class, String.class);
+ this.handler.handleReturnValue(Flux.just("foo", "bar"), parameter, message).block();
+ Flux result = this.handler.encodedContent;
+
+ StepVerifier.create(result)
+ .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
+ .consumeNextWith(buffer -> assertEquals("bar", DataBufferTestUtils.dumpString(buffer, UTF_8)))
+ .verifyComplete();
+ }
+
+ @Test
+ public void voidReturnValue() {
+ testVoidReturnType(null, on(TestController.class).resolveReturnType(void.class));
+ testVoidReturnType(Mono.empty(), on(TestController.class).resolveReturnType(Mono.class, Void.class));
+ testVoidReturnType(Completable.complete(), on(TestController.class).resolveReturnType(Completable.class));
+
+ }
+
+ private void testVoidReturnType(@Nullable Object value, MethodParameter bodyParameter) {
+ this.handler.handleReturnValue(value, bodyParameter, message).block();
+ Flux result = this.handler.encodedContent;
+ StepVerifier.create(result).expectComplete().verify();
+ }
+
+ @Test
+ public void noEncoder() {
+ MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class);
+ this.handler.handleReturnValue(new Object(), parameter, message).block();
+ Flux result = this.handler.encodedContent;
+
+ StepVerifier.create(result)
+ .expectErrorMessage("No encoder for method 'object' parameter -1")
+ .verify();
+ }
+
+
+ @SuppressWarnings({"unused", "ConstantConditions"})
+ private static class TestController {
+
+ String string() { return null; }
+
+ Object object() { return null; }
+
+ Flux fluxString() { return null; }
+
+ void voidReturn() { }
+
+ Mono monoVoid() { return null; }
+
+ Completable completable() { return null; }
+ }
+
+
+ private static class TestEncoderMethodReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
+
+ private Flux encodedContent;
+
+
+ public Flux getEncodedContent() {
+ return this.encodedContent;
+ }
+
+ protected TestEncoderMethodReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
+ super(encoders, registry);
+ }
+
+ @Override
+ protected Mono handleEncodedContent(
+ Flux encodedContent, MethodParameter returnType, Message> message) {
+
+ this.encodedContent = encodedContent;
+ return Mono.empty();
+ }
+ }
+
+}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/InvocableHandlerMethodTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/InvocableHandlerMethodTests.java
index c0e2d36f08..c41cae403b 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/InvocableHandlerMethodTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/InvocableHandlerMethodTests.java
@@ -29,8 +29,8 @@ import reactor.test.StepVerifier;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
-import org.springframework.messaging.handler.ResolvableMethod;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
+import org.springframework.messaging.handler.invocation.ResolvableMethod;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
diff --git a/spring-web/src/test/java/org/springframework/web/method/ResolvableMethod.java b/spring-web/src/test/java/org/springframework/web/method/ResolvableMethod.java
index edf2ab4ba7..14e4f7631b 100644
--- a/spring-web/src/test/java/org/springframework/web/method/ResolvableMethod.java
+++ b/spring-web/src/test/java/org/springframework/web/method/ResolvableMethod.java
@@ -57,7 +57,9 @@ import org.springframework.util.ReflectionUtils;
import static java.util.stream.Collectors.*;
/**
- * Convenience class to resolve method parameters from hints.
+ * Convenience class to resolve to a Method and method parameters.
+ *
+ * Note that a replica of this class also exists in spring-messaging.
*
*
Background
*