GH-422 Add RabbitMQ instructions for Cloud Events interaction

This commit is contained in:
Oleg Zhurakousky
2020-11-11 11:53:39 +01:00
parent 97347bf30d
commit 27d0d8afd9
9 changed files with 106 additions and 52 deletions

View File

@@ -16,12 +16,14 @@
package org.springframework.cloud.function.context.catalog;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -170,7 +172,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
function = this.compose(type, functionDefinition);
}
if (function != null) {
if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) {
function.expectedOutputContentType = expectedOutputMimeTypes;
}
else if (logger.isDebugEnabled()) {
@@ -804,6 +806,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
: new OriginalMessageHolder(((Message) input).getPayload(), (Message<?>) input);
}
else if (input instanceof Message) {
if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")
&& !this.isInputTypeMessage()) { //TODO rework
return null;
}
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (convertedInput == null) { // give ConversionService a chance
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false);
@@ -866,7 +872,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
else if (output instanceof Collection && this.isOutputTypeMessage()) {
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
}
else if (!ObjectUtils.isEmpty(contentType)) {
else if (ObjectUtils.isArray(output) && !(output instanceof byte[])) {
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
}
else {
convertedOutput = messageConverter.toMessage(output,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType[0])));
}
@@ -1043,14 +1052,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
@SuppressWarnings("unchecked")
private Object convertMultipleOutputValuesIfNecessary(Object output, String[] contentType) {
Collection outputCollection = (Collection) output;
Collection convertedOutputCollection = output instanceof List ? new ArrayList<>() : new TreeSet<>();
Collection outputCollection = ObjectUtils.isArray(output) ? Arrays.asList(output) : (Collection) output;
Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() : new TreeSet<>();
Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) : this.outputType;
for (Object outToConvert : outputCollection) {
Object result = this.convertOutputIfNecessary(outToConvert, this.outputType, contentType);
Assert.notNull(result, () -> "Failed to convert output '" + output + "'");
Object result = this.convertOutputIfNecessary(outToConvert, type, contentType);
Assert.notNull(result, () -> "Failed to convert output '" + outToConvert + "'");
convertedOutputCollection.add(result);
}
return convertedOutputCollection;
return ObjectUtils.isArray(output) ? convertedOutputCollection.toArray() : convertedOutputCollection;
}
/*

View File

@@ -17,13 +17,13 @@
package org.springframework.cloud.function.context.config;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
@@ -55,7 +55,9 @@ public class CloudEventJsonMessageConverter extends JsonMessageConverter {
return message.getPayload();
}
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
String jsonString = (String) message.getPayload();
String jsonString = message.getPayload() instanceof String
? (String) message.getPayload()
: new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
Map<String, Object> mapEvent = this.mapper.fromJson(jsonString, Map.class);
Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType);
mapEvent.remove("data");
@@ -63,12 +65,6 @@ public class CloudEventJsonMessageConverter extends JsonMessageConverter {
}
}
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
@Nullable Object conversionHint) {
throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress");
}
private boolean isBinary(Message<?> message) {
Map<String, Object> headers = message.getHeaders();
return headers.containsKey("source") && headers.containsKey("specversion") && headers.containsKey("type");

View File

@@ -62,11 +62,11 @@ public class BeanFactoryAwarePojoFunctionRegistryTests {
Function<Message<String>, String> f2message = catalog.lookup("myFunction");
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunction", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunction");
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunction", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
}
@Test
@@ -85,11 +85,11 @@ public class BeanFactoryAwarePojoFunctionRegistryTests {
Function<Message<String>, String> f2message = catalog.lookup("myFunctionLike");
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunctionLike", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunctionLike");
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunctionLike", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
}
@Test