GH-422 Initial support for CloudEvents

Added initial implementation of MessageConverter
At the moment there seem that MessageConverter(s) would be the only thing needed to integrate Cloud Events with various elements of Spring
This commit is contained in:
Oleg Zhurakousky
2020-11-10 14:50:09 +01:00
parent 43e1651527
commit 8a032e7ed9
4 changed files with 82 additions and 7 deletions

View File

@@ -688,6 +688,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value);
}
if (inputValue instanceof Message && !this.isInputTypeMessage()) {
inputValue = ((Message) inputValue).getPayload();
}
Object result = ((Function) this.target).apply(inputValue);
return value instanceof OriginalMessageHolder
@@ -985,7 +988,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
convertedInput = message;
}
else {
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
if (!(convertedInput instanceof Message)) {
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
}
}
}
return convertedInput;

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Map;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
/**
* Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
* actual conversion via {@link JsonMapper} instance.
*
* @author Oleg Zhurakousky
*
* @since 3.1.0
*/
public class CloudEventJsonMessageConverter extends JsonMessageConverter {
private final JsonMapper mapper;
public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
super(jsonMapper, new MimeType("application", "cloudevents+json"));
this.mapper = jsonMapper;
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
String jsonString = (String) message.getPayload();
Map<String, Object> mapEvent = this.mapper.fromJson(jsonString, Map.class);
Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType);
mapEvent.remove("data");
return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build();
}
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
@Nullable Object conversionHint) {
throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress");
}
}

View File

@@ -97,11 +97,11 @@ public class ContextFunctionCatalogAutoConfiguration {
.collect(Collectors.toList());
mcList.add(new JsonMessageConverter(jsonMapper));
mcList.add(new CloudEventJsonMessageConverter(jsonMapper));
mcList.add(new ByteArrayMessageConverter());
mcList.add(new StringMessageConverter());
mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService));
if (!CollectionUtils.isEmpty(mcList)) {
messageConverter = new SmartCompositeMessageConverter(mcList);
}

View File

@@ -78,14 +78,20 @@ public class JsonMessageConverter extends AbstractMessageConverter {
return message.getPayload();
}
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
try {
return this.jsonMapper.fromJson(message.getPayload(), convertToType);
if (targetClass == byte[].class && message.getPayload() instanceof String) {
return ((String) message.getPayload()).getBytes(StandardCharsets.UTF_8);
}
catch (Exception e) {
if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) {
return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
else {
try {
return this.jsonMapper.fromJson(message.getPayload(), convertToType);
}
catch (Exception e) {
if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) {
return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
}
}
}
return null;
}