GH-663 Fix how strtategies are used to register additional encoders/decoders

Resolves #663
This commit is contained in:
Oleg Zhurakousky
2021-03-05 08:04:53 +01:00
parent 046aa354da
commit fcd427ca29
8 changed files with 114 additions and 120 deletions

View File

@@ -108,9 +108,11 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void afterPropertiesSet() {
setEncoders(Collections.singletonList(new ServerMessageEncoder(this.jsonMapper)));
List encoders = this.getEncoders();
encoders.set(0, new MessageAwareJsonEncoder(this.jsonMapper));
super.afterPropertiesSet();
}
@@ -248,9 +250,14 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
// could be array, map or string
Object structure = this.jsonMapper.fromJson(value, Object.class);
if (structure instanceof Map) {
return MessageBuilder.withPayload(((Map<String, ?>) structure).remove(FunctionRSocketUtils.PAYLOAD))
.copyHeaders((Map<String, ?>) ((Map<String, ?>) structure).get(FunctionRSocketUtils.HEADERS))
.build();
if (((Map<String, ?>) structure).containsKey(FunctionRSocketUtils.PAYLOAD)) {
return MessageBuilder.withPayload(((Map<String, ?>) structure).remove(FunctionRSocketUtils.PAYLOAD))
.copyHeaders((Map<String, ?>) ((Map<String, ?>) structure).get(FunctionRSocketUtils.HEADERS))
.build();
}
else {
return MessageBuilder.withPayload(structure).build();
}
}
}
return value;

View File

@@ -28,6 +28,7 @@ import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
*
@@ -35,17 +36,17 @@ import org.springframework.util.MimeType;
* @since 3.1
*
*/
class ClientMessageDecoder extends Jackson2JsonDecoder {
class MessageAwareJsonDecoder extends Jackson2JsonDecoder {
private final JsonMapper jsonMapper;
ClientMessageDecoder(JsonMapper jsonMapper) {
MessageAwareJsonDecoder(JsonMapper jsonMapper) {
this.jsonMapper = jsonMapper;
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
return true;
return mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
}
@@ -56,17 +57,22 @@ class ClientMessageDecoder extends Jackson2JsonDecoder {
ResolvableType type = ResolvableType.forClassWithGenerics(Map.class, String.class, Object.class);
Map<String, Object> messageMap = (Map<String, Object>) super.decode(dataBuffer, type, mimeType, hints);
if (messageMap.containsKey(FunctionRSocketUtils.PAYLOAD)) {
Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType());
Object payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType);
Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType());
Object payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType);
if (FunctionTypeUtils.isMessage(targetType.getType())) {
return MessageBuilder.withPayload(payload)
.copyHeaders((Map<String, ?>) messageMap.get(FunctionRSocketUtils.HEADERS))
.build();
if (FunctionTypeUtils.isMessage(targetType.getType())) {
return MessageBuilder.withPayload(payload)
.copyHeaders((Map<String, ?>) messageMap.get(FunctionRSocketUtils.HEADERS))
.build();
}
else {
return payload;
}
}
else {
return payload;
return messageMap;
}
}
}

View File

@@ -30,7 +30,6 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StreamUtils;
/**
*
@@ -38,25 +37,29 @@ import org.springframework.util.StreamUtils;
* @since 3.1
*
*/
class ClientMessageEncoder extends Jackson2JsonEncoder {
/**
* The default buffer size used by the encoder.
*/
public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE;
class MessageAwareJsonEncoder extends Jackson2JsonEncoder {
private final JsonMapper mapper;
private final boolean isClient;
ClientMessageEncoder(JsonMapper mapper) {
MessageAwareJsonEncoder(JsonMapper mapper) {
this(mapper, false);
}
MessageAwareJsonEncoder(JsonMapper mapper, boolean isClient) {
this.mapper = mapper;
this.isClient = isClient;
}
@Override
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
return FunctionTypeUtils.isMessage(elementType.getType())
|| Map.class.isAssignableFrom(FunctionTypeUtils.getRawType(elementType.getType()));
boolean canEncode = mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
if (canEncode && this.isClient) {
canEncode = (FunctionTypeUtils.isMessage(elementType.getType())
|| Map.class.isAssignableFrom(FunctionTypeUtils.getRawType(elementType.getType())));
}
return canEncode;
}

View File

@@ -16,22 +16,20 @@
package org.springframework.cloud.function.rsocket;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessageHandlerCustomizer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.rsocket.messaging.RSocketStrategiesCustomizer;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.RSocketStrategies.Builder;
/**
* Main configuration class for components required to support RSocket integration with
@@ -48,23 +46,17 @@ import org.springframework.messaging.rsocket.RSocketStrategies;
class RSocketAutoConfiguration {
@Bean
public BeanPostProcessor rSocketBuilderPostProcessor(ApplicationContext applicationContext) {
return new BeanPostProcessor() {
RSocketStrategiesCustomizer rSocketStrategiesCustomizer(JsonMapper jsonMapper) {
return new RSocketStrategiesCustomizer() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RSocketRequester.Builder) {
JsonMapper mapper = applicationContext.getBean(JsonMapper.class);
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> {
encoders.add(0, new ClientMessageEncoder(mapper));
})
.decoders(decoders -> {
decoders.add(0, new ClientMessageDecoder(mapper));
})
.build();
bean = ((RSocketRequester.Builder) bean).rsocketStrategies(strategies);
}
return bean;
public void customize(Builder strategies) {
strategies
.encoders(encoders -> {
encoders.add(0, new MessageAwareJsonEncoder(jsonMapper, true));
})
.decoders(decoders -> {
decoders.add(0, new MessageAwareJsonDecoder(jsonMapper));
});
}
};
}
@@ -72,7 +64,7 @@ class RSocketAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@Primary
public FunctionRSocketMessageHandler functionRSocketMessageHandler(RSocketStrategies rSocketStrategies,
FunctionRSocketMessageHandler functionRSocketMessageHandler(RSocketStrategies rSocketStrategies,
ObjectProvider<RSocketMessageHandlerCustomizer> customizers, FunctionCatalog functionCatalog,
FunctionProperties functionProperties, JsonMapper jsonMapper) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
class RSocketRoutingAutoConfiguration {
@Bean
public RSocketConnectorConfigurer functionRSocketConnectorConfigurer(
RSocketConnectorConfigurer functionRSocketConnectorConfigurer(
FunctionRSocketMessageHandler handler) {
return connector -> connector.acceptor(handler.responder());
}

View File

@@ -1,71 +0,0 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.rsocket;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
class ServerMessageEncoder extends Jackson2JsonEncoder {
private final JsonMapper mapper;
ServerMessageEncoder(JsonMapper mapper) {
this.mapper = mapper;
}
@Override
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
return mimeType.isCompatibleWith(MimeTypeUtils.APPLICATION_JSON);
}
@Override
public List<MimeType> getEncodableMimeTypes() {
return Collections.singletonList(MimeTypeUtils.APPLICATION_JSON);
}
@Override
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (value instanceof Message) {
value = FunctionRSocketUtils.sanitizeMessageToMap((Message<?>) value);
}
else {
if (JsonMapper.isJsonString(value)) {
value = this.mapper.fromJson(value, valueType.getType());
}
value = Collections.singletonMap(FunctionRSocketUtils.PAYLOAD, value);
}
return super.encodeValue(value, bufferFactory, valueType, mimeType, hints);
}
}

View File

@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.boot.WebApplicationType;
@@ -226,6 +227,32 @@ public class MessagingTests {
}
}
@Test
public void testPojoToMessageMap() {
int port = SocketUtils.findAvailableTcpPort();
try (
ConfigurableApplicationContext applicationContext =
new SpringApplicationBuilder(MessagingConfiguration.class)
.web(WebApplicationType.NONE)
.run("--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.rsocket.server.port=" + port);
) {
RSocketRequester.Builder rsocketRequesterBuilder =
applicationContext.getBean(RSocketRequester.Builder.class);
Person p = new Person();
p.setName("Ricky");
Message<Map<String, Object>> result = rsocketRequesterBuilder.tcp("localhost", port)
.route("echoMessageMap")
.data(p)
.retrieveMono(new ParameterizedTypeReference<Message<Map<String, Object>>>() {
})
.block();
assertThat(((Map) result.getPayload()).get("name")).isEqualTo("Ricky");
}
}
@EnableAutoConfiguration
@@ -239,6 +266,20 @@ public class MessagingTests {
};
}
@Bean
public Function<Message<Map<String, Object>>, Message<Map<String, Object>>> echoMessageMap() {
return v -> {
return v;
};
}
@Bean
public Function<Flux<Message<Map<String, Object>>>, Flux<Message<Map<String, Object>>>> echoMessageMapReactive() {
return v -> {
return v;
};
}
@Bean
public Function<Message<Person>, Person> pojoMessageToPojo() {
return p -> {

View File

@@ -340,6 +340,22 @@ public class RSocketAutoConfigurationTests {
.expectNext("HELLO")
.expectComplete()
.verify();
rsocketRequesterBuilder.tcp("localhost", port)
.route("uppercaseReactive")
.data("hello")
.retrieveFlux(String.class)
.as(StepVerifier::create)
.expectNext("HELLO")
.expectComplete()
.verify();
rsocketRequesterBuilder.tcp("localhost", port)
.route("uppercaseReactive")
.data("hello")
.retrieveFlux(String.class)
.as(StepVerifier::create)
.expectNext("HELLO")
.expectComplete()
.verify();
}
}