GH-434 Added generic FunctionInvoker for AWS

- Added generic FunctionInvoker capable of handling the request generically without requiring user to implemen specific AWS request handler

Resolves #434
This commit is contained in:
Oleg Zhurakousky
2019-12-05 19:28:54 +01:00
parent 0f38ea47b8
commit 52b0fdea50
15 changed files with 641 additions and 234 deletions

View File

@@ -0,0 +1,166 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConversionException;
/**
* Variation of {@link MappingJackson2MessageConverter} to support marshalling and
* unmarshalling of Messages's payload from 'String' or 'byte[]' to an instance of a
* 'targetClass' and and back to 'byte[]'.
*
* @author Oleg Zhurakousky
* @author Gary Russell
* @since 2.0
*/
class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageConverter {
private final Map<Type, JavaType> typeCache = new ConcurrentHashMap<>();
ApplicationJsonMessageMarshallingConverter(@Nullable ObjectMapper objectMapper) {
if (objectMapper != null) {
this.setObjectMapper(objectMapper);
}
}
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
@Nullable Object conversionHint) {
if (payload instanceof byte[]) {
return payload;
}
else if (payload instanceof String) {
return ((String) payload).getBytes(StandardCharsets.UTF_8);
}
else {
return super.convertToInternal(payload, headers, conversionHint);
}
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object hint) {
Object conversionHint = hint;
Object result = null;
if (conversionHint instanceof MethodParameter) {
Class<?> conversionHintType = ((MethodParameter) conversionHint)
.getParameterType();
if (Message.class.isAssignableFrom(conversionHintType)) {
/*
* Ensures that super won't attempt to create Message as a result of
* conversion and stays at payload conversion only. The Message will
* eventually be created in
* MessageMethodArgumentResolver.resolveArgument(..)
*/
conversionHint = null;
}
else if (((MethodParameter) conversionHint)
.getGenericParameterType() instanceof ParameterizedType) {
ParameterizedTypeReference<Object> forType = ParameterizedTypeReference
.forType(((MethodParameter) conversionHint)
.getGenericParameterType());
result = convertParameterizedType(message, forType.getType());
}
}
else if (conversionHint instanceof ParameterizedTypeReference) {
result = convertParameterizedType(message, ((ParameterizedTypeReference<?>) conversionHint).getType());
}
else if (conversionHint instanceof ParameterizedType) {
result = convertParameterizedType(message, (Type) conversionHint);
}
if (result == null) {
if (message.getPayload() instanceof byte[]
&& targetClass.isAssignableFrom(String.class)) {
result = new String((byte[]) message.getPayload(),
StandardCharsets.UTF_8);
}
else {
result = super.convertFromInternal(message, targetClass, conversionHint);
}
}
return result;
}
private Object convertParameterizedType(Message<?> message, Type conversionHint) {
ObjectMapper objectMapper = this.getObjectMapper();
Object payload = message.getPayload();
try {
JavaType type = this.typeCache.get(conversionHint);
if (type == null) {
conversionHint = FunctionTypeUtils.isMessage(conversionHint)
? FunctionTypeUtils.getImmediateGenericType(conversionHint, 0)
: conversionHint;
type = objectMapper.getTypeFactory()
.constructType(conversionHint);
this.typeCache.put(conversionHint, type);
}
if (payload instanceof byte[]) {
return objectMapper.readValue((byte[]) payload, type);
}
else if (payload instanceof String) {
return objectMapper.readValue((String) payload, type);
}
else {
final JavaType typeToUse = type;
if (payload instanceof Collection) {
Collection<?> collection = (Collection<?>) ((Collection<?>) payload).stream()
.map(value -> {
try {
if (value instanceof byte[]) {
return objectMapper.readValue((byte[]) value, typeToUse.getContentType());
}
else if (value instanceof String) {
return objectMapper.readValue((String) value, typeToUse.getContentType());
}
}
catch (Exception e) {
logger.error("Failed to convert payload " + value, e);
}
return null;
}).collect(Collectors.toList());
return collection;
}
return null;
}
}
catch (IOException e) {
throw new MessageConversionException("Cannot parse payload ", e);
}
}
}

View File

@@ -47,9 +47,9 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.CollectionUtils;
@@ -70,7 +70,7 @@ public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@Bean
public FunctionRegistry functionCatalog(Map<String, MessageConverter> messageConverters) {
public FunctionRegistry functionCatalog(Map<String, MessageConverter> messageConverters, @Nullable ObjectMapper objectMapper) {
ConversionService conversionService = new DefaultConversionService();
CompositeMessageConverter messageConverter = null;
List<MessageConverter> mcList = new ArrayList<>();
@@ -88,7 +88,10 @@ public class ContextFunctionCatalogAutoConfiguration {
}
}
if (addDefaultConverters) {
mcList.add(new MappingJackson2MessageConverter());
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
mcList.add(new ApplicationJsonMessageMarshallingConverter(objectMapper));
mcList.add(new ByteArrayMessageConverter());
mcList.add(new StringMessageConverter());
}

View File

@@ -79,6 +79,25 @@ public class BeanFactoryAwareFunctionRegistryMultiInOutTests {
System.out.println(result);
}
@Test
public void testMultiInputWithPojoConversion() {
FunctionCatalog catalog = this.configureCatalog();
Function<Tuple2<Flux<CartEvent>, Flux<CheckoutEvent>>, Flux<OrderEvent>> multiInputFunction =
catalog.lookup("thomas", "application/json");
CartEvent carEvent = new CartEvent();
carEvent.setCarEvent("carEvent");
Flux<CartEvent> carEventStream = Flux.just(carEvent);
CheckoutEvent checkoutEvent = new CheckoutEvent();
checkoutEvent.setCheckoutEvent("checkoutEvent");
Flux<CheckoutEvent> checkoutEventStream = Flux.just(checkoutEvent);
Tuple2<Flux<CartEvent>, Flux<CheckoutEvent>> streams = Tuples.of(carEventStream, checkoutEventStream);
List<OrderEvent> result = multiInputFunction.apply(streams).collectList().block();
System.out.println(result);
}
@SuppressWarnings("unused")
@Test
@Ignore
@@ -380,6 +399,56 @@ public class BeanFactoryAwareFunctionRegistryMultiInOutTests {
return new Flux[] { repeated, sum };
};
}
@Bean
public Function<Tuple2<Flux<CartEvent>, Flux<CheckoutEvent>>, Flux<OrderEvent>> thomas() {
return tuple -> {
Flux<CartEvent> cartEventStream = tuple.getT1();
Flux<CheckoutEvent> checkoutEventStream = tuple.getT2();
return Flux.zip(cartEventStream, checkoutEventStream, (cartEvent, checkoutEvent) -> {
OrderEvent oe = new OrderEvent();
oe.setOrderEvent(cartEvent.toString() + "- " + checkoutEvent.toString());
return oe;
});
};
}
}
public static class CartEvent {
private String carEvent;
public String getCarEvent() {
return carEvent;
}
public void setCarEvent(String carEvent) {
this.carEvent = carEvent;
}
}
public static class CheckoutEvent {
private String checkoutEvent;
public String getCheckoutEvent() {
return checkoutEvent;
}
public void setCheckoutEvent(String checkoutEvent) {
this.checkoutEvent = checkoutEvent;
}
}
public static class OrderEvent {
private String orderEvent;
public String getOrderEvent() {
return orderEvent;
}
public void setOrderEvent(String orderEvent) {
this.orderEvent = orderEvent;
}
}
public static class Person {

View File

@@ -66,7 +66,7 @@ public class BeanFactoryAwarePojoFunctionRegistryTests {
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunction", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("MESSAGE");
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunction");
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");
@@ -89,7 +89,7 @@ public class BeanFactoryAwarePojoFunctionRegistryTests {
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunctionLike", "application/json");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("MESSAGE");
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunctionLike");
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");