GH-750 Add support for pluggable protobufs

This initial support adds plugin extension to support CloudEvent proto as well as the example
Additional plugins could be provided in the same ay as CloudEvent plugin extension

Resolves #750
This commit is contained in:
Oleg Zhurakousky
2021-10-11 14:03:24 +02:00
parent 346ff53539
commit 7fc755e157
41 changed files with 2659 additions and 312 deletions

View File

@@ -1046,6 +1046,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
private boolean isExtractPayload(Message<?> message, Type type) {
if (FunctionTypeUtils.isCollectionOfMessage(type)) {
return true;
}
@@ -1054,6 +1055,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
Object payload = message.getPayload();
if ((payload instanceof byte[])) {
return false;
}
if (ObjectUtils.isArray(payload)) {
payload = CollectionUtils.arrayToList(payload);
}
@@ -1072,6 +1076,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
* set as a header in a message or explicitly provided as part of the lookup.
*/
private Object convertOutputIfNecessary(Object output, Type type, String[] contentType) {
if (output instanceof Message && ((Message) output).getPayload() instanceof byte[]) {
return output;
}
if (this.skipOutputConversion) {
return output;
}
@@ -1087,6 +1094,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
Object convertedOutput = output;
if (FunctionTypeUtils.isMultipleArgumentType(type)) {
convertedOutput = this.convertMultipleOutputArgumentTypeIfNecesary(convertedOutput, type, contentType);
}

View File

@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -403,11 +404,15 @@ public class BeanFactoryAwareFunctionRegistryTests {
@SuppressWarnings("unchecked")
@Test
@Disabled
public void byteArrayNoSpecialHandling() throws Exception {
FunctionCatalog catalog = this.configureCatalog(ByteArrayFunction.class);
FunctionInvocationWrapper function = catalog.lookup("beanFactoryAwareFunctionRegistryTests.ByteArrayFunction", "application/json");
assertThat(function).isNotNull();
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/octet-stream").build());
System.out.println(new String(result.getPayload()));
assertThat(result.getPayload()).isEqualTo("\"b2xsZWg=\"".getBytes());
}