Interim for Riff team review

This commit is contained in:
Oleg Zhurakousky
2019-06-25 12:45:18 +03:00
parent 98bdd8237a
commit e8bf50821f
6 changed files with 189 additions and 54 deletions

View File

@@ -17,9 +17,9 @@
package org.springframework.cloud.function.context;
import java.util.Set;
import java.util.function.Function;
import org.springframework.cloud.function.context.catalog.OutputPostProcessor;
import org.springframework.util.MimeType;
/**
* @author Dave Syer
@@ -28,10 +28,12 @@ import org.springframework.cloud.function.context.catalog.OutputPostProcessor;
public interface FunctionCatalog {
default <T> T lookup(String name, OutputPostProcessor outputPostProcessor) {
throw new UnsupportedOperationException("This type of lookup is not supported by this instance of FunctionCatalog");
default <T> T lookupRaw(String name, MimeType... acceptedOutputTypes) {
return null;
}
/**
* Will look up the instance of the functional interface by name only.
* @param <T> instance type

View File

@@ -89,6 +89,38 @@ class FunctionTypeConversionHelper {
return input;
}
@SuppressWarnings("rawtypes")
Object convertOutputIfNecessary(Object output) {
List<Object> convertedResults = new ArrayList<Object>();
if (output instanceof Tuple2) {
convertedResults.add(this.doConvert(((Tuple2)output).getT1(), byte[].class, true));
convertedResults.add(this.doConvert(((Tuple2)output).getT2(), byte[].class, true));
}
if (output instanceof Tuple3) {
convertedResults.add(this.doConvert(((Tuple3)output).getT3(), byte[].class, true));
}
if (output instanceof Tuple4) {
convertedResults.add(this.doConvert(((Tuple4)output).getT4(), byte[].class, true));
}
if (output instanceof Tuple5) {
convertedResults.add(this.doConvert(((Tuple5)output).getT5(), byte[].class, true));
}
if (output instanceof Tuple6) {
convertedResults.add(this.doConvert(((Tuple6)output).getT6(), byte[].class, true));
}
if (output instanceof Tuple7) {
convertedResults.add(this.doConvert(((Tuple7)output).getT7(), byte[].class, true));
}
if (output instanceof Tuple8) {
convertedResults.add(this.doConvert(((Tuple8)output).getT8(), byte[].class, true));
}
output = CollectionUtils.isEmpty(convertedResults)
? this.doConvert(output, byte[].class, true)
: Tuples.fromArray(convertedResults.toArray());
return output;
}
int getInputArgumentCount() {
Type[] types = ((ParameterizedType)this.functionArgumentTypes[0]).getActualTypeArguments();
return types.length;
@@ -135,27 +167,30 @@ class FunctionTypeConversionHelper {
return (Class<?>) targetType;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object doConvert(Object input, Type targetType) {
private Object doConvert(Object incoming, Type targetType) {
return this.doConvert(incoming, targetType, false);
}
Class<?> actualInputType = this.getRawType(targetType);
if (input instanceof Publisher) {
if (!actualInputType.isAssignableFrom(Void.class)) {
input = input instanceof Mono
? Mono.from((Publisher) input).map(value -> this.convertInputArgument(value, targetType, actualInputType))
: Flux.from((Publisher) input).map(value -> this.convertInputArgument(value, targetType, actualInputType));
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object doConvert(Object incoming, Type targetType, boolean toMessage) {
Class<?> actualType = this.getRawType(targetType);
if (incoming instanceof Publisher) {
if (!actualType.isAssignableFrom(Void.class)) {
incoming = incoming instanceof Mono
? Mono.from((Publisher) incoming).map(value -> this.doConvertArgument(value, targetType, actualType, toMessage))
: Flux.from((Publisher) incoming).map(value -> this.doConvertArgument(value, targetType, actualType, toMessage));
}
}
else {
Assert.isTrue(!Publisher.class.isAssignableFrom(this.functionRegistration.getType().getInputWrapper()),
"Invoking reactive function as imperative is not allowed. Function name(s): "
+ this.functionRegistration.getNames());
input = this.convertInputArgument(input, targetType, actualInputType);
incoming = this.doConvertArgument(incoming, targetType, actualType, toMessage);
}
return input;
return incoming;
}
private Object convertInputArgument(Object incomingValue, Type targetType, Class<?> actualInputType) {
private Object doConvertArgument(Object incomingValue, Type targetType, Class<?> actualInputType, boolean toMessage) {
if (!Void.class.isAssignableFrom(actualInputType)) {
if (incomingValue instanceof Message<?>) {
incomingValue = this.isMessage(targetType)
@@ -169,6 +204,9 @@ class FunctionTypeConversionHelper {
incomingValue = this.conversionService.convert(incomingValue, actualInputType);
}
}
if (toMessage) {
incomingValue = MessageBuilder.withPayload(incomingValue).build();
}
}
else {
incomingValue = null;

View File

@@ -72,16 +72,11 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
this.messageConverter = messageConverter;
}
@SuppressWarnings("unchecked")
@Override
public <T> T lookup(String definition, OutputPostProcessor outputPostProcessor) {
return (T) this.compose(null, definition, false, outputPostProcessor);
}
@SuppressWarnings("unchecked")
@Override
public <T> T lookup(Class<?> type, String definition) {
return (T) this.compose(type, definition, false, new DefaultOutputPostProcessor());
return (T) this.compose(type, definition, false);
}
@Override
@@ -168,10 +163,10 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
// }
@SuppressWarnings({ "unchecked", "rawtypes" })
private Function<?,?> compose(Class<?> type, String definition, boolean raw, OutputPostProcessor outputPostProcessor) {
private Function<?,?> compose(Class<?> type, String definition, boolean raw) {
Function<?,?> resultFunction = null;
if (this.registrationsByName.containsKey(definition)) {
resultFunction = new FunctionInvocationWrapper(this.registrationsByName.get(definition), false, outputPostProcessor);
resultFunction = new FunctionInvocationWrapper(this.registrationsByName.get(definition), false);
}
else {
String[] names = StringUtils.delimitedListToStringArray(definition.replaceAll(",", "|").trim(), "|");
@@ -200,7 +195,7 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
FunctionRegistration<Object> registration = new FunctionRegistration<>(function, name).type(funcType);
registrationsByFunction.putIfAbsent(function, registration);
registrationsByName.putIfAbsent(name, registration);
function = new FunctionInvocationWrapper(registration, false, outputPostProcessor);
function = new FunctionInvocationWrapper(registration, false);
if (resultFunction == null) {
resultFunction = (Function<?,?>) function;
}
@@ -221,7 +216,7 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
registration = new FunctionRegistration<Object>(resultFunction, composedNameBuilder.toString()).type(funcType);
registrationsByFunction.putIfAbsent(resultFunction, registration);
registrationsByName.putIfAbsent(composedNameBuilder.toString(), registration);
resultFunction = new FunctionInvocationWrapper(registration, true, outputPostProcessor);
resultFunction = new FunctionInvocationWrapper(registration, true);
}
previousFunctionType = funcType;
prefix = "|";
@@ -252,13 +247,10 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
private final FunctionTypeConversionHelper functionTypeConversionHelper;
private final OutputPostProcessor outputPostProcessor;
FunctionInvocationWrapper(FunctionRegistration<?> functionRegistration, boolean composed, OutputPostProcessor outputPostProcessor) {
FunctionInvocationWrapper(FunctionRegistration<?> functionRegistration, boolean composed) {
this.target = functionRegistration.getTarget();
this.functionRegistration = functionRegistration;
this.composed = composed;
this.outputPostProcessor = outputPostProcessor;
this.functionTypeConversionHelper = new FunctionTypeConversionHelper(this.functionRegistration,
conversionService, messageConverter);
}
@@ -313,7 +305,9 @@ public class LazyFunctionRegistry implements FunctionRegistry, FunctionInspector
}
}
result = this.outputPostProcessor.postProcessOutput(result);
// ====
//result = this.functionTypeConversionHelper.convertOutputIfNecessary(result);
//
return this.wrapOutputToReactiveIfNecessary(result);
}

View File

@@ -79,6 +79,7 @@ import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.ClassUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
@@ -104,16 +105,20 @@ public class ContextFunctionCatalogAutoConfiguration {
// }
@Bean
public FunctionRegistry functionCatalog(@Nullable ConversionService conversionService, @Nullable CompositeMessageConverter messageConverter) {
public FunctionRegistry functionCatalog(@Nullable ConversionService conversionService, @Nullable CompositeMessageConverter messageConverter,
Map<String, MessageConverter> additionalConverters) {
conversionService = conversionService == null ? new DefaultConversionService() : conversionService;
if (messageConverter == null) {
List<MessageConverter> messageConverters = new ArrayList<>();
messageConverters.addAll(additionalConverters.values());
messageConverters.add(new MappingJackson2MessageConverter());
messageConverters.add(new ByteArrayMessageConverter());
messageConverters.add(new StringMessageConverter());
messageConverter = new CompositeMessageConverter(messageConverters);
}
if (conversionService != null) {
((ConfigurableConversionService)conversionService).addConverter(new MyConverter());
((ConfigurableConversionService)conversionService).addConverter(new ObjectToByteArrayConverter());
}
return new LazyFunctionRegistry(conversionService, messageConverter);
}
@@ -141,6 +146,64 @@ public class ContextFunctionCatalogAutoConfiguration {
}
public static class ObjectToByteArrayConverter implements ConditionalGenericConverter {
ObjectMapper mapper = new ObjectMapper();
@Override
public Set<ConvertiblePair> getConvertibleTypes() {
return Collections.singleton(new ConvertiblePair(Object.class, byte[].class));
}
@Override
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
try {
byte[] result = mapper.writeValueAsBytes(source);
return result;
}
catch (Exception e) {
throw new IllegalStateException("Failwd to convert " + source + " to byte[]", e);
}
}
@Override
public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
if (ClassUtils.isAssignable(Object.class, sourceType.getType()) && ClassUtils.isAssignable(byte[].class, targetType.getType())) {
// maybe
return true;
}
return false;
}
}
public static class Person {
private String name;
private int id;
public Person() {
}
public Person(String name, int id) {
this.name = name;
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String toString() {
return "Person: " + name + "/" + id;
}
}
@Bean(RoutingFunction.FUNCTION_NAME)
@ConditionalOnProperty(name = "spring.cloud.function.routing.enabled", havingValue = "true")
RoutingFunction gateway(FunctionCatalog functionCatalog, FunctionInspector functionInspector) {

View File

@@ -16,12 +16,15 @@
package org.springframework.cloud.function.context.catalog;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -29,8 +32,11 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
@@ -71,6 +77,7 @@ public class LazyFunctionRegistryMultiInOutTests {
@SuppressWarnings("unused")
@Test
@Ignore
public void testMultiInputBiFunction() {
FunctionCatalog catalog = this.configureCatalog();
BiFunction<Flux<String>, Flux<Integer>, Flux<String>> multiInputFunction =
@@ -220,20 +227,35 @@ public class LazyFunctionRegistryMultiInOutTests {
result.getT2().subscribe(v -> System.out.println("=> 2: " + v));
}
// @Test
// public void testMultiToMultiByteArray() {
// FunctionCatalog catalog = this.configureCatalog();
// Function<Tuple3<Flux<String>, Flux<String>, Flux<Integer>>, Tuple2<Flux<Person>, Mono<Long>>> multiTuMulti =
// catalog.lookup("multiTuMulti");
//
// Flux<String> firstFlux = Flux.just("Unlce", "Oncle");
// Flux<String> secondFlux = Flux.just("Sam", "Pierre");
// Flux<Integer> thirdFlux = Flux.just(1, 2);
//
// Tuple2<Flux<Person>, Mono<Long>> result = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux));
// result.getT1().subscribe(v -> System.out.println("=> 1: " + v));
// result.getT2().subscribe(v -> System.out.println("=> 2: " + v));
// }
@Test
public void testMultiToMultiWithMessageByteArrayPayload() {
FunctionCatalog catalog = this.configureCatalog();
Function<Tuple3<Flux<Message<byte[]>>, Flux<Message<byte[]>>, Flux<Message<byte[]>>>, Tuple2<Flux<Message<byte[]>>, Mono<Message<byte[]>>>> multiTuMulti =
catalog.lookupRaw("multiTuMulti", MimeTypeUtils.parseMimeType("foo/bar"), MimeTypeUtils.parseMimeType("bar/*"));
Flux<Message<byte[]>> firstFlux = Flux.just(
MessageBuilder.withPayload("Unlce".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build(),
MessageBuilder.withPayload("Onlce".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build());
Flux<Message<byte[]>> secondFlux = Flux.just(
MessageBuilder.withPayload("Sam".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build(),
MessageBuilder.withPayload("Pierre".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build());
ByteBuffer one = ByteBuffer.allocate(4);
one.putInt(1);
ByteBuffer two = ByteBuffer.allocate(4);
two.putInt(2);
Flux<Message<byte[]>> thirdFlux = Flux.just(
MessageBuilder.withPayload(one.array()).setHeader(MessageHeaders.CONTENT_TYPE, "octet-stream/integer").build(),
MessageBuilder.withPayload(two.array()).setHeader(MessageHeaders.CONTENT_TYPE, "octet-stream/integer").build());
Tuple2<Flux<Message<byte[]>>, Mono<Message<byte[]>>> result = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux));
result.getT1().subscribe(v -> System.out.println("=> 1: " + v));
result.getT2().subscribe(v -> System.out.println("=> 2: " + v));
//Tuple2<Object, Object> d = multiTuMulti.apply(Tuples.of(firstFlux, secondFlux, thirdFlux));
}
@EnableAutoConfiguration
@@ -299,6 +321,29 @@ public class LazyFunctionRegistryMultiInOutTests {
};
}
@Bean
public MessageConverter byteArrayToIntegerMessageConverter() {
return new AbstractMessageConverter(MimeTypeUtils.parseMimeType("octet-stream/integer")) {
@Override
protected boolean supports(Class<?> clazz) {
return Integer.class.isAssignableFrom(clazz);
}
protected Object convertFromInternal(
Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
ByteBuffer wrappedPayload = ByteBuffer.wrap((byte[])message.getPayload());
return wrappedPayload.getInt();
}
protected Object convertToInternal(
Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
return null;
}
};
}
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<?>[]> repeater() {

View File

@@ -204,15 +204,8 @@ public class LazyFunctionRegistryTests {
List<String> result = multiInputFunction.apply(Tuples.of(stringStream, intStream)).collectList().block();
System.out.println(result);
}
private class MyFunction {
public String apply(String a, Integer b, Foo c) {
}
}
@Test
public void testMultiInputWithComposition() {
FunctionCatalog catalog = this.configureCatalog();