From e8bf50821f0a76f6523b6bb04ed91aa725ab385c Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 25 Jun 2019 12:45:18 +0300 Subject: [PATCH] Interim for Riff team review --- .../function/context/FunctionCatalog.java | 10 ++- .../catalog/FunctionTypeConversionHelper.java | 60 ++++++++++++--- .../context/catalog/LazyFunctionRegistry.java | 24 +++--- ...ntextFunctionCatalogAutoConfiguration.java | 65 ++++++++++++++++- .../LazyFunctionRegistryMultiInOutTests.java | 73 +++++++++++++++---- .../catalog/LazyFunctionRegistryTests.java | 11 +-- 6 files changed, 189 insertions(+), 54 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java index 4147263a4..ad457f439 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionCatalog.java @@ -17,9 +17,9 @@ package org.springframework.cloud.function.context; import java.util.Set; -import java.util.function.Function; -import org.springframework.cloud.function.context.catalog.OutputPostProcessor; +import org.springframework.util.MimeType; + /** * @author Dave Syer @@ -28,10 +28,12 @@ import org.springframework.cloud.function.context.catalog.OutputPostProcessor; public interface FunctionCatalog { - default T lookup(String name, OutputPostProcessor outputPostProcessor) { - throw new UnsupportedOperationException("This type of lookup is not supported by this instance of FunctionCatalog"); + + default T lookupRaw(String name, MimeType... acceptedOutputTypes) { + return null; } + /** * Will look up the instance of the functional interface by name only. * @param instance type diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeConversionHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeConversionHelper.java index 58771dcfa..535424efa 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeConversionHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeConversionHelper.java @@ -89,6 +89,38 @@ class FunctionTypeConversionHelper { return input; } + @SuppressWarnings("rawtypes") + Object convertOutputIfNecessary(Object output) { + List convertedResults = new ArrayList(); + if (output instanceof Tuple2) { + convertedResults.add(this.doConvert(((Tuple2)output).getT1(), byte[].class, true)); + convertedResults.add(this.doConvert(((Tuple2)output).getT2(), byte[].class, true)); + } + if (output instanceof Tuple3) { + convertedResults.add(this.doConvert(((Tuple3)output).getT3(), byte[].class, true)); + } + if (output instanceof Tuple4) { + convertedResults.add(this.doConvert(((Tuple4)output).getT4(), byte[].class, true)); + } + if (output instanceof Tuple5) { + convertedResults.add(this.doConvert(((Tuple5)output).getT5(), byte[].class, true)); + } + if (output instanceof Tuple6) { + convertedResults.add(this.doConvert(((Tuple6)output).getT6(), byte[].class, true)); + } + if (output instanceof Tuple7) { + convertedResults.add(this.doConvert(((Tuple7)output).getT7(), byte[].class, true)); + } + if (output instanceof Tuple8) { + convertedResults.add(this.doConvert(((Tuple8)output).getT8(), byte[].class, true)); + } + + output = CollectionUtils.isEmpty(convertedResults) + ? this.doConvert(output, byte[].class, true) + : Tuples.fromArray(convertedResults.toArray()); + return output; + } + int getInputArgumentCount() { Type[] types = ((ParameterizedType)this.functionArgumentTypes[0]).getActualTypeArguments(); return types.length; @@ -135,27 +167,30 @@ class FunctionTypeConversionHelper { return (Class) targetType; } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private Object doConvert(Object input, Type targetType) { + private Object doConvert(Object incoming, Type targetType) { + return this.doConvert(incoming, targetType, false); + } - Class actualInputType = this.getRawType(targetType); - if (input instanceof Publisher) { - if (!actualInputType.isAssignableFrom(Void.class)) { - input = input instanceof Mono - ? Mono.from((Publisher) input).map(value -> this.convertInputArgument(value, targetType, actualInputType)) - : Flux.from((Publisher) input).map(value -> this.convertInputArgument(value, targetType, actualInputType)); + @SuppressWarnings({ "unchecked", "rawtypes" }) + private Object doConvert(Object incoming, Type targetType, boolean toMessage) { + Class actualType = this.getRawType(targetType); + if (incoming instanceof Publisher) { + if (!actualType.isAssignableFrom(Void.class)) { + incoming = incoming instanceof Mono + ? Mono.from((Publisher) incoming).map(value -> this.doConvertArgument(value, targetType, actualType, toMessage)) + : Flux.from((Publisher) incoming).map(value -> this.doConvertArgument(value, targetType, actualType, toMessage)); } } else { Assert.isTrue(!Publisher.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper()), "Invoking reactive function as imperative is not allowed. Function name(s): " + this.functionRegistration.getNames()); - input = this.convertInputArgument(input, targetType, actualInputType); + incoming = this.doConvertArgument(incoming, targetType, actualType, toMessage); } - return input; + return incoming; } - private Object convertInputArgument(Object incomingValue, Type targetType, Class actualInputType) { + private Object doConvertArgument(Object incomingValue, Type targetType, Class actualInputType, boolean toMessage) { if (!Void.class.isAssignableFrom(actualInputType)) { if (incomingValue instanceof Message) { incomingValue = this.isMessage(targetType) @@ -169,6 +204,9 @@ class FunctionTypeConversionHelper { incomingValue = this.conversionService.convert(incomingValue, actualInputType); } } + if (toMessage) { + incomingValue = MessageBuilder.withPayload(incomingValue).build(); + } } else { incomingValue = null; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistry.java index 5adcb016e..4f5a79678 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistry.java @@ -72,16 +72,11 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector this.messageConverter = messageConverter; } - @SuppressWarnings("unchecked") - @Override - public T lookup(String definition, OutputPostProcessor outputPostProcessor) { - return (T) this.compose(null, definition, false, outputPostProcessor); - } @SuppressWarnings("unchecked") @Override public T lookup(Class type, String definition) { - return (T) this.compose(type, definition, false, new DefaultOutputPostProcessor()); + return (T) this.compose(type, definition, false); } @Override @@ -168,10 +163,10 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector // } @SuppressWarnings({ "unchecked", "rawtypes" }) - private Function compose(Class type, String definition, boolean raw, OutputPostProcessor outputPostProcessor) { + private Function compose(Class type, String definition, boolean raw) { Function resultFunction = null; if (this.registrationsByName.containsKey(definition)) { - resultFunction = new FunctionInvocationWrapper(this.registrationsByName.get(definition), false, outputPostProcessor); + resultFunction = new FunctionInvocationWrapper(this.registrationsByName.get(definition), false); } else { String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|"); @@ -200,7 +195,7 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector FunctionRegistration registration = new FunctionRegistration<>(function, name).type(funcType); registrationsByFunction.putIfAbsent(function, registration); registrationsByName.putIfAbsent(name, registration); - function = new FunctionInvocationWrapper(registration, false, outputPostProcessor); + function = new FunctionInvocationWrapper(registration, false); if (resultFunction == null) { resultFunction = (Function) function; } @@ -221,7 +216,7 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector registration = new FunctionRegistration(resultFunction, composedNameBuilder.toString()).type(funcType); registrationsByFunction.putIfAbsent(resultFunction, registration); registrationsByName.putIfAbsent(composedNameBuilder.toString(), registration); - resultFunction = new FunctionInvocationWrapper(registration, true, outputPostProcessor); + resultFunction = new FunctionInvocationWrapper(registration, true); } previousFunctionType = funcType; prefix = "|"; @@ -252,13 +247,10 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector private final FunctionTypeConversionHelper functionTypeConversionHelper; - private final OutputPostProcessor outputPostProcessor; - - FunctionInvocationWrapper(FunctionRegistration functionRegistration, boolean composed, OutputPostProcessor outputPostProcessor) { + FunctionInvocationWrapper(FunctionRegistration functionRegistration, boolean composed) { this.target = functionRegistration.getTarget(); this.functionRegistration = functionRegistration; this.composed = composed; - this.outputPostProcessor = outputPostProcessor; this.functionTypeConversionHelper = new FunctionTypeConversionHelper(this.functionRegistration, conversionService, messageConverter); } @@ -313,7 +305,9 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector } } - result = this.outputPostProcessor.postProcessOutput(result); + // ==== + //result = this.functionTypeConversionHelper.convertOutputIfNecessary(result); + // return this.wrapOutputToReactiveIfNecessary(result); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index ab565fa15..7f10457e1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -79,6 +79,7 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.util.ClassUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; @@ -104,16 +105,20 @@ public class ContextFunctionCatalogAutoConfiguration { // } @Bean - public FunctionRegistry functionCatalog(@Nullable ConversionService conversionService, @Nullable CompositeMessageConverter messageConverter) { + public FunctionRegistry functionCatalog(@Nullable ConversionService conversionService, @Nullable CompositeMessageConverter messageConverter, + Map additionalConverters) { conversionService = conversionService == null ? new DefaultConversionService() : conversionService; if (messageConverter == null) { List messageConverters = new ArrayList<>(); + messageConverters.addAll(additionalConverters.values()); messageConverters.add(new MappingJackson2MessageConverter()); messageConverters.add(new ByteArrayMessageConverter()); + messageConverters.add(new StringMessageConverter()); messageConverter = new CompositeMessageConverter(messageConverters); } if (conversionService != null) { ((ConfigurableConversionService)conversionService).addConverter(new MyConverter()); + ((ConfigurableConversionService)conversionService).addConverter(new ObjectToByteArrayConverter()); } return new LazyFunctionRegistry(conversionService, messageConverter); } @@ -141,6 +146,64 @@ public class ContextFunctionCatalogAutoConfiguration { } + public static class ObjectToByteArrayConverter implements ConditionalGenericConverter { + + ObjectMapper mapper = new ObjectMapper(); + + @Override + public Set getConvertibleTypes() { + return Collections.singleton(new ConvertiblePair(Object.class, byte[].class)); + } + + @Override + public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { + try { + byte[] result = mapper.writeValueAsBytes(source); + return result; + } + catch (Exception e) { + throw new IllegalStateException("Failwd to convert " + source + " to byte[]", e); + } + } + + @Override + public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) { + if (ClassUtils.isAssignable(Object.class, sourceType.getType()) && ClassUtils.isAssignable(byte[].class, targetType.getType())) { + // maybe + return true; + } + return false; + } + + } + + public static class Person { + private String name; + private int id; + public Person() { + + } + public Person(String name, int id) { + this.name = name; + this.id = id; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public int getId() { + return id; + } + public void setId(int id) { + this.id = id; + } + public String toString() { + return "Person: " + name + "/" + id; + } + } + @Bean(RoutingFunction.FUNCTION_NAME) @ConditionalOnProperty(name = "spring.cloud.function.routing.enabled", havingValue = "true") RoutingFunction gateway(FunctionCatalog functionCatalog, FunctionInspector functionInspector) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryMultiInOutTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryMultiInOutTests.java index 62fc08773..835848840 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryMultiInOutTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryMultiInOutTests.java @@ -16,12 +16,15 @@ package org.springframework.cloud.function.context.catalog; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.function.BiFunction; import java.util.function.Function; +import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -29,8 +32,11 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeTypeUtils; @@ -71,6 +77,7 @@ public class LazyFunctionRegistryMultiInOutTests { @SuppressWarnings("unused") @Test + @Ignore public void testMultiInputBiFunction() { FunctionCatalog catalog = this.configureCatalog(); BiFunction, Flux, Flux> multiInputFunction = @@ -220,20 +227,35 @@ public class LazyFunctionRegistryMultiInOutTests { result.getT2().subscribe(v -> System.out.println("=> 2: " + v)); } -// @Test -// public void testMultiToMultiByteArray() { -// FunctionCatalog catalog = this.configureCatalog(); -// Function, Flux, Flux>, Tuple2, Mono>> multiTuMulti = -// catalog.lookup("multiTuMulti"); -// -// Flux firstFlux = Flux.just("Unlce", "Oncle"); -// Flux secondFlux = Flux.just("Sam", "Pierre"); -// Flux thirdFlux = Flux.just(1, 2); -// -// Tuple2, Mono> result = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux)); -// result.getT1().subscribe(v -> System.out.println("=> 1: " + v)); -// result.getT2().subscribe(v -> System.out.println("=> 2: " + v)); -// } + @Test + public void testMultiToMultiWithMessageByteArrayPayload() { + FunctionCatalog catalog = this.configureCatalog(); + Function>, Flux>, Flux>>, Tuple2>, Mono>>> multiTuMulti = + catalog.lookupRaw("multiTuMulti", MimeTypeUtils.parseMimeType("foo/bar"), MimeTypeUtils.parseMimeType("bar/*")); + + Flux> firstFlux = Flux.just( + MessageBuilder.withPayload("Unlce".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build(), + MessageBuilder.withPayload("Onlce".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build()); + Flux> secondFlux = Flux.just( + MessageBuilder.withPayload("Sam".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build(), + MessageBuilder.withPayload("Pierre".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build()); + + ByteBuffer one = ByteBuffer.allocate(4); + one.putInt(1); + ByteBuffer two = ByteBuffer.allocate(4); + two.putInt(2); + + Flux> thirdFlux = Flux.just( + MessageBuilder.withPayload(one.array()).setHeader(MessageHeaders.CONTENT_TYPE, "octet-stream/integer").build(), + MessageBuilder.withPayload(two.array()).setHeader(MessageHeaders.CONTENT_TYPE, "octet-stream/integer").build()); + + + Tuple2>, Mono>> result = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux)); + result.getT1().subscribe(v -> System.out.println("=> 1: " + v)); + result.getT2().subscribe(v -> System.out.println("=> 2: " + v)); + + //Tuple2 d = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux)); + } @EnableAutoConfiguration @@ -299,6 +321,29 @@ public class LazyFunctionRegistryMultiInOutTests { }; } + @Bean + public MessageConverter byteArrayToIntegerMessageConverter() { + return new AbstractMessageConverter(MimeTypeUtils.parseMimeType("octet-stream/integer")) { + + @Override + protected boolean supports(Class clazz) { + return Integer.class.isAssignableFrom(clazz); + } + + protected Object convertFromInternal( + Message message, Class targetClass, @Nullable Object conversionHint) { + ByteBuffer wrappedPayload = ByteBuffer.wrap((byte[])message.getPayload()); + return wrappedPayload.getInt(); + } + + protected Object convertToInternal( + Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) { + + return null; + } + }; + } + @Bean public Function, Flux>, Flux[]> repeater() { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryTests.java index 017355481..e0afddede 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/LazyFunctionRegistryTests.java @@ -204,15 +204,8 @@ public class LazyFunctionRegistryTests { List result = multiInputFunction.apply(Tuples.of(stringStream, intStream)).collectList().block(); System.out.println(result); } - - - private class MyFunction { - public String apply(String a, Integer b, Foo c) { - - } - } - - + + @Test public void testMultiInputWithComposition() { FunctionCatalog catalog = this.configureCatalog();