From fcd427ca2969b77b4ab6f47f213cc9ccc512dc6f Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 5 Mar 2021 08:04:53 +0100 Subject: [PATCH] GH-663 Fix how strtategies are used to register additional encoders/decoders Resolves #663 --- .../FunctionRSocketMessageHandler.java | 15 ++-- ...oder.java => MessageAwareJsonDecoder.java} | 28 +++++--- ...oder.java => MessageAwareJsonEncoder.java} | 25 ++++--- .../rsocket/RSocketAutoConfiguration.java | 34 ++++----- .../RSocketRoutingAutoConfiguration.java | 4 +- .../rsocket/ServerMessageEncoder.java | 71 ------------------- .../function/rsocket/MessagingTests.java | 41 +++++++++++ .../RSocketAutoConfigurationTests.java | 16 +++++ 8 files changed, 114 insertions(+), 120 deletions(-) rename spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/{ClientMessageDecoder.java => MessageAwareJsonDecoder.java} (72%) rename spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/{ClientMessageEncoder.java => MessageAwareJsonEncoder.java} (80%) delete mode 100644 spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ServerMessageEncoder.java diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java index 11f872d2b..b67103c53 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketMessageHandler.java @@ -108,9 +108,11 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void afterPropertiesSet() { - setEncoders(Collections.singletonList(new ServerMessageEncoder(this.jsonMapper))); + List encoders = this.getEncoders(); + encoders.set(0, new MessageAwareJsonEncoder(this.jsonMapper)); super.afterPropertiesSet(); } @@ -248,9 +250,14 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler { // could be array, map or string Object structure = this.jsonMapper.fromJson(value, Object.class); if (structure instanceof Map) { - return MessageBuilder.withPayload(((Map) structure).remove(FunctionRSocketUtils.PAYLOAD)) - .copyHeaders((Map) ((Map) structure).get(FunctionRSocketUtils.HEADERS)) - .build(); + if (((Map) structure).containsKey(FunctionRSocketUtils.PAYLOAD)) { + return MessageBuilder.withPayload(((Map) structure).remove(FunctionRSocketUtils.PAYLOAD)) + .copyHeaders((Map) ((Map) structure).get(FunctionRSocketUtils.HEADERS)) + .build(); + } + else { + return MessageBuilder.withPayload(structure).build(); + } } } return value; diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageDecoder.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java similarity index 72% rename from spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageDecoder.java rename to spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java index 2d600df9c..f8a27e62e 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageDecoder.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java @@ -28,6 +28,7 @@ import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.lang.Nullable; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; /** * @@ -35,17 +36,17 @@ import org.springframework.util.MimeType; * @since 3.1 * */ -class ClientMessageDecoder extends Jackson2JsonDecoder { +class MessageAwareJsonDecoder extends Jackson2JsonDecoder { private final JsonMapper jsonMapper; - ClientMessageDecoder(JsonMapper jsonMapper) { + MessageAwareJsonDecoder(JsonMapper jsonMapper) { this.jsonMapper = jsonMapper; } @Override public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { - return true; + return mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON); } @@ -56,17 +57,22 @@ class ClientMessageDecoder extends Jackson2JsonDecoder { ResolvableType type = ResolvableType.forClassWithGenerics(Map.class, String.class, Object.class); Map messageMap = (Map) super.decode(dataBuffer, type, mimeType, hints); + if (messageMap.containsKey(FunctionRSocketUtils.PAYLOAD)) { + Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType()); + Object payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType); - Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType()); - Object payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType); - - if (FunctionTypeUtils.isMessage(targetType.getType())) { - return MessageBuilder.withPayload(payload) - .copyHeaders((Map) messageMap.get(FunctionRSocketUtils.HEADERS)) - .build(); + if (FunctionTypeUtils.isMessage(targetType.getType())) { + return MessageBuilder.withPayload(payload) + .copyHeaders((Map) messageMap.get(FunctionRSocketUtils.HEADERS)) + .build(); + } + else { + return payload; + } } else { - return payload; + return messageMap; } + } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageEncoder.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonEncoder.java similarity index 80% rename from spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageEncoder.java rename to spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonEncoder.java index 545f054ca..3bd2f4a7b 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ClientMessageEncoder.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonEncoder.java @@ -30,7 +30,6 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; -import org.springframework.util.StreamUtils; /** * @@ -38,25 +37,29 @@ import org.springframework.util.StreamUtils; * @since 3.1 * */ -class ClientMessageEncoder extends Jackson2JsonEncoder { - - /** - * The default buffer size used by the encoder. - */ - public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE; - +class MessageAwareJsonEncoder extends Jackson2JsonEncoder { private final JsonMapper mapper; + private final boolean isClient; - ClientMessageEncoder(JsonMapper mapper) { + MessageAwareJsonEncoder(JsonMapper mapper) { + this(mapper, false); + } + + MessageAwareJsonEncoder(JsonMapper mapper, boolean isClient) { this.mapper = mapper; + this.isClient = isClient; } @Override public boolean canEncode(ResolvableType elementType, MimeType mimeType) { - return FunctionTypeUtils.isMessage(elementType.getType()) - || Map.class.isAssignableFrom(FunctionTypeUtils.getRawType(elementType.getType())); + boolean canEncode = mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON); + if (canEncode && this.isClient) { + canEncode = (FunctionTypeUtils.isMessage(elementType.getType()) + || Map.class.isAssignableFrom(FunctionTypeUtils.getRawType(elementType.getType()))); + } + return canEncode; } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java index 55baaaf17..cbca5b4b5 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketAutoConfiguration.java @@ -16,22 +16,20 @@ package org.springframework.cloud.function.rsocket; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.rsocket.RSocketMessageHandlerCustomizer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; -import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.messaging.rsocket.RSocketStrategies.Builder; /** * Main configuration class for components required to support RSocket integration with @@ -48,23 +46,17 @@ import org.springframework.messaging.rsocket.RSocketStrategies; class RSocketAutoConfiguration { @Bean - public BeanPostProcessor rSocketBuilderPostProcessor(ApplicationContext applicationContext) { - return new BeanPostProcessor() { + RSocketStrategiesCustomizer rSocketStrategiesCustomizer(JsonMapper jsonMapper) { + return new RSocketStrategiesCustomizer() { @Override - public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof RSocketRequester.Builder) { - JsonMapper mapper = applicationContext.getBean(JsonMapper.class); - RSocketStrategies strategies = RSocketStrategies.builder() - .encoders(encoders -> { - encoders.add(0, new ClientMessageEncoder(mapper)); - }) - .decoders(decoders -> { - decoders.add(0, new ClientMessageDecoder(mapper)); - }) - .build(); - bean = ((RSocketRequester.Builder) bean).rsocketStrategies(strategies); - } - return bean; + public void customize(Builder strategies) { + strategies + .encoders(encoders -> { + encoders.add(0, new MessageAwareJsonEncoder(jsonMapper, true)); + }) + .decoders(decoders -> { + decoders.add(0, new MessageAwareJsonDecoder(jsonMapper)); + }); } }; } @@ -72,7 +64,7 @@ class RSocketAutoConfiguration { @Bean @ConditionalOnMissingBean @Primary - public FunctionRSocketMessageHandler functionRSocketMessageHandler(RSocketStrategies rSocketStrategies, + FunctionRSocketMessageHandler functionRSocketMessageHandler(RSocketStrategies rSocketStrategies, ObjectProvider customizers, FunctionCatalog functionCatalog, FunctionProperties functionProperties, JsonMapper jsonMapper) { diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java index d5e5feaed..23b7e0c70 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketRoutingAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ import org.springframework.messaging.rsocket.RSocketConnectorConfigurer; class RSocketRoutingAutoConfiguration { @Bean - public RSocketConnectorConfigurer functionRSocketConnectorConfigurer( + RSocketConnectorConfigurer functionRSocketConnectorConfigurer( FunctionRSocketMessageHandler handler) { return connector -> connector.acceptor(handler.responder()); } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ServerMessageEncoder.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ServerMessageEncoder.java deleted file mode 100644 index 4c0393761..000000000 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/ServerMessageEncoder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2021-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.rsocket; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.core.ResolvableType; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.http.codec.json.Jackson2JsonEncoder; -import org.springframework.lang.Nullable; -import org.springframework.messaging.Message; -import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; -/** - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -class ServerMessageEncoder extends Jackson2JsonEncoder { - - private final JsonMapper mapper; - - - ServerMessageEncoder(JsonMapper mapper) { - this.mapper = mapper; - } - - @Override - public boolean canEncode(ResolvableType elementType, MimeType mimeType) { - return mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON); - } - - - @Override - public List getEncodableMimeTypes() { - return Collections.singletonList(MimeTypeUtils.APPLICATION_JSON); - } - - @Override - public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, - ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { - if (value instanceof Message) { - value = FunctionRSocketUtils.sanitizeMessageToMap((Message) value); - } - else { - if (JsonMapper.isJsonString(value)) { - value = this.mapper.fromJson(value, valueType.getType()); - } - value = Collections.singletonMap(FunctionRSocketUtils.PAYLOAD, value); - } - return super.encodeValue(value, bufferFactory, valueType, mimeType, hints); - } -} diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessagingTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessagingTests.java index 208cb4610..03752b142 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessagingTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/MessagingTests.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.function.Function; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import org.springframework.boot.WebApplicationType; @@ -226,6 +227,32 @@ public class MessagingTests { } } + @Test + public void testPojoToMessageMap() { + int port = SocketUtils.findAvailableTcpPort(); + try ( + ConfigurableApplicationContext applicationContext = + new SpringApplicationBuilder(MessagingConfiguration.class) + .web(WebApplicationType.NONE) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.rsocket.server.port=" + port); + ) { + RSocketRequester.Builder rsocketRequesterBuilder = + applicationContext.getBean(RSocketRequester.Builder.class); + Person p = new Person(); + p.setName("Ricky"); + + Message> result = rsocketRequesterBuilder.tcp("localhost", port) + .route("echoMessageMap") + .data(p) + .retrieveMono(new ParameterizedTypeReference>>() { + }) + .block(); + + assertThat(((Map) result.getPayload()).get("name")).isEqualTo("Ricky"); + } + } + @EnableAutoConfiguration @@ -239,6 +266,20 @@ public class MessagingTests { }; } + @Bean + public Function>, Message>> echoMessageMap() { + return v -> { + return v; + }; + } + + @Bean + public Function>>, Flux>>> echoMessageMapReactive() { + return v -> { + return v; + }; + } + @Bean public Function, Person> pojoMessageToPojo() { return p -> { diff --git a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java index 7866a237b..61bddb018 100644 --- a/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java +++ b/spring-cloud-function-rsocket/src/test/java/org/springframework/cloud/function/rsocket/RSocketAutoConfigurationTests.java @@ -340,6 +340,22 @@ public class RSocketAutoConfigurationTests { .expectNext("HELLO") .expectComplete() .verify(); + rsocketRequesterBuilder.tcp("localhost", port) + .route("uppercaseReactive") + .data("hello") + .retrieveFlux(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); + rsocketRequesterBuilder.tcp("localhost", port) + .route("uppercaseReactive") + .data("hello") + .retrieveFlux(String.class) + .as(StepVerifier::create) + .expectNext("HELLO") + .expectComplete() + .verify(); } }