GH-562 Add type conversion documentation

Add test in AWS to showcase type conversion
Fix AWS FunctionInvoker to delegate to effectively delegate type conversion to the native mechanism of spring-cloud-function

Resolves #562
This commit is contained in:
Oleg Zhurakousky
2020-07-29 20:17:03 +02:00
parent 48b8f4ce89
commit 6b9ce4cb0c
5 changed files with 236 additions and 81 deletions

View File

@@ -210,6 +210,140 @@ IMPORTANT: IMPORTANT: At the moment, function arity is *only* supported for reac
where evaluation and computation on confluence of events typically requires view into a
stream of events rather than single event.
=== Type conversion (Content-Type negotiation)
Content-Type negotiation is one of the core features of Spring Cloud Function as it allows to not only transform the incoming data to the types declared
by the function signature, but to do the same transformation during function composition making otherwise un-composable (by type) functions composable.
To better understand the mechanics and the necessity behind content-type negotiation, we take a look at a very simple use case by
using the following function as an example:
[source, java]
----
@Bean
public Function<Person, String> personFunction {..}
----
The function shown in the preceding example expects a `Person` object as an argument and produces a String type as an output. If such function is
invoked with the type `Person`, than all works fine. But typically function plays a role of a handler for the incoming data which most often comes
in the raw format such as `byte[]`, `JSON String` etc. In order for the framework to succeed in passing the incoming data as an argument to
this function, it has to somehow transform the incoming data to a `Person` type.
Spring Cloud Function relies on two native to Spring mechanisms to accomplish that.
. _MessageConverter_ - to convert from incoming Message data to a type declared by the function.
. _ConversionService_ - to convert from incoming non-Message data to a type declared by the function.
This means that depending on the type of the raw data (Message or non-Message) Spring Cloud Function will apply one or the other mechanisms.
For most cases when dealing with functions that are invoked as part of some other request (e.g., HTTP, Messaging etc) the framework relies on `MessageConverters`,
since such requests already converted to Spring `Message`. In other words, the framework locates and applies the appropriate `MessageConverter`.
To accomplish that, the framework needs some instructions from the user. One of these instructions is already provided by the signature of the function
itself (Person type). Consequently, in theory, that should be (and, in some cases, is) enough. However, for the majority of use cases, in order to
select the appropriate `MessageConverter`, the framework needs an additional piece of information. That missing piece is `contentType` header.
Such header usually comes as part of the Message where it is injected by the corresponding adapter that created such Message in the first place.
For example, HTTP POST request will have its content-type HTTP header copied to `contentType` header of the Message.
For cases when such header does not exist framework relies on the default content type as `application/json`.
==== Content Type versus Argument Type
As mentioned earlier, for the framework to select the appropriate `MessageConverter`, it requires argument type and, optionally, content type information.
The logic for selecting the appropriate `MessageConverter` resides with the argument resolvers which trigger right before the invocation of the user-defined
function (which is when the actual argument type is known to the framework).
If the argument type does not match the type of the current payload, the framework delegates to the stack of the
pre-configured `MessageConverters` to see if any one of them can convert the payload.
The combination of `contentType` and argument type is the mechanism by which framework determines if message can be converted to a target type by locating
the appropriate `MessageConverter`.
If no appropriate `MessageConverter` is found, an exception is thrown, which you can handle by adding a custom `MessageConverter`
(see `<<user-defined-message-converters>>`).
NOTE: Do not expect `Message` to be converted into some other type based only on the `contentType`.
Remember that the `contentType` is complementary to the target type.
It is a hint, which `MessageConverter` may or may not take into consideration.
==== Message Converters
`MessageConverters` define two methods:
[source, java]
----
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
----
It is important to understand the contract of these methods and their usage, specifically in the context of Spring Cloud Stream.
The `fromMessage` method converts an incoming `Message` to an argument type.
The payload of the `Message` could be any type, and it is
up to the actual implementation of the `MessageConverter` to support multiple types.
==== Provided MessageConverters
As mentioned earlier, the framework already provides a stack of `MessageConverters` to handle most common use cases.
The following list describes the provided `MessageConverters`, in order of precedence (the first `MessageConverter` that works is used):
. `JsonMessageConverter`: Supports conversion of the payload of the `Message` to/from POJO for cases when `contentType` is `application/json` using Jackson or Gson libraries (DEFAULT).
. `ByteArrayMessageConverter`: Supports conversion of the payload of the `Message` from `byte[]` to `byte[]` for cases when `contentType` is `application/octet-stream`. It is essentially a pass through and exists primarily for backward compatibility.
. `StringMessageConverter`: Supports conversion of any type to a `String` when `contentType` is `text/plain`.
When no appropriate converter is found, the framework throws an exception. When that happens, you should check your code and configuration and ensure you did
not miss anything (that is, ensure that you provided a `contentType` by using a binding or a header).
However, most likely, you found some uncommon case (such as a custom `contentType` perhaps) and the current stack of provided `MessageConverters`
does not know how to convert. If that is the case, you can add custom `MessageConverter`. See <<user-defined-message-converters>>.
[[user-defined-message-converters]]
==== User-defined Message Converters
Spring Cloud Function exposes a mechanism to define and register additional `MessageConverters`.
To use it, implement `org.springframework.messaging.converter.MessageConverter`, configure it as a `@Bean`.
It is then appended to the existing stack of `MessageConverter`s.
NOTE: It is important to understand that custom `MessageConverter` implementations are added to the head of the existing stack.
Consequently, custom `MessageConverter` implementations take precedence over the existing ones, which lets you override as well as add to the existing converters.
The following example shows how to create a message converter bean to support a new content type called `application/bar`:
[source,java]
----
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
----
=== Kotlin Lambda support
We also provide support for Kotlin lambdas (since v2.0).

View File

@@ -31,10 +31,6 @@ import java.util.Map;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
@@ -165,7 +161,7 @@ public class FunctionInvoker implements RequestStreamHandler {
@SuppressWarnings({ "unchecked", "rawtypes" })
private Message<byte[]> generateMessage(InputStream input, Context context) throws IOException {
byte[] payload = StreamUtils.copyToByteArray(input);
final byte[] payload = StreamUtils.copyToByteArray(input);
if (logger.isInfoEnabled()) {
logger.info("Incoming JSON for ApiGateway Event: " + new String(payload));
@@ -183,34 +179,8 @@ public class FunctionInvoker implements RequestStreamHandler {
if (requestMap.containsKey("Records")) {
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
if (this.isKinesisEvent(records.get(0))) {
logger.info("Incoming request is Kinesis Event");
Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else if (this.isS3Event(records.get(0))) {
logger.info("Incoming request is S3 Event");
Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only S3Event or Map type is supported as input type for functions that accept S3 Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else if (this.isSNSEvent(records.get(0))) {
logger.info("Incoming request is SNS Event");
Assert.isTrue(inputType instanceof Class && SNSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only SNSEvent or Map type is supported as input type for functions that accept SNSEvent");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SNSEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else {
logger.info("Incoming request is SQS Event");
Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only SQSEvent or Map type is supported as input type for functions that accept SQS Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SQSEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
this.logEvent(records);
messageBuilder = MessageBuilder.withPayload(payload);
}
else if (requestMap.containsKey("httpMethod")) { // API Gateway
logger.info("Incoming request is API Gateway");
@@ -219,14 +189,12 @@ public class FunctionInvoker implements RequestStreamHandler {
messageBuilder = MessageBuilder.withPayload(gatewayEvent);
}
else if (mapInputType) {
messageBuilder = MessageBuilder.withPayload(requestMap)
.setHeader("httpMethod", requestMap.get("httpMethod"));
messageBuilder = MessageBuilder.withPayload(requestMap).setHeader("httpMethod", requestMap.get("httpMethod"));
}
else {
Object body = requestMap.remove("body");
body = body instanceof String ? ("\"" + body + "\"").getBytes(StandardCharsets.UTF_8) : mapper.writeValueAsBytes(body);
messageBuilder = MessageBuilder.withPayload(body)
.copyHeaders(requestMap);
messageBuilder = MessageBuilder.withPayload(body).copyHeaders(requestMap);
}
}
}
@@ -236,6 +204,21 @@ public class FunctionInvoker implements RequestStreamHandler {
return messageBuilder.setHeader("aws-context", context).build();
}
private void logEvent(List<Map<String, ?>> records) {
if (this.isKinesisEvent(records.get(0))) {
logger.info("Incoming request is Kinesis Event");
}
else if (this.isS3Event(records.get(0))) {
logger.info("Incoming request is S3 Event");
}
else if (this.isSNSEvent(records.get(0))) {
logger.info("Incoming request is SNS Event");
}
else {
logger.info("Incoming request is SQS Event");
}
}
private boolean isSNSEvent(Map<String, ?> record) {
return record.containsKey("Sns");
}

View File

@@ -29,13 +29,14 @@ import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.util.MimeType;
import static org.assertj.core.api.Assertions.assertThat;
@@ -359,15 +360,15 @@ public class FunctionInvokerTests {
@Test
public void testKinesisStringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("kinesisSchemaVersion");
}
@Test
@@ -414,15 +415,15 @@ public class FunctionInvokerTests {
@Test
public void testSQSStringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result.length()).isEqualTo(14); // some additional JSON formatting
}
@Test
@@ -469,15 +470,15 @@ public class FunctionInvokerTests {
@Test
public void testSNSStringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sns");
}
@Test
@@ -522,18 +523,17 @@ public class FunctionInvokerTests {
assertThat(result).contains("arn:aws:sns");
}
@Test
public void testS3StringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("s3SchemaVersion");
}
@Test
@@ -693,8 +693,11 @@ public class FunctionInvokerTests {
@Configuration
public static class SQSConfiguration {
@Bean
public Function<String, String> echoString() {
return v -> v;
public Function<Person, String> echoString() {
return v -> {
System.out.println("Echo: " + v);
return v.toString();
};
}
@Bean
@@ -720,6 +723,32 @@ public class FunctionInvokerTests {
return v.toString();
};
}
@Bean
public MyCustomMessageConverter messageConverter() {
return new MyCustomMessageConverter();
}
}
public static class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("*", "*"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Person.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String v = payload instanceof String ? (String) payload : new String((byte[]) payload);
Person person = new Person();
person.setName(v.substring(0, 10));
return person;
}
}
@EnableAutoConfiguration
@@ -727,7 +756,10 @@ public class FunctionInvokerTests {
public static class SNSConfiguration {
@Bean
public Function<String, String> echoString() {
return v -> v;
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
@Bean
@@ -834,5 +866,10 @@ public class FunctionInvokerTests {
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return this.name;
}
}
}

View File

@@ -77,8 +77,13 @@ public class JsonMessageConverter extends AbstractMessageConverter {
}
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
Object result = jsonMapper.fromJson(message.getPayload(), convertToType);
return result;
try {
return this.jsonMapper.fromJson(message.getPayload(), convertToType);
}
catch (Exception e) {
// ignore
}
return null;
}
@Override

View File

@@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Dave Syer
@@ -32,8 +30,6 @@ import org.apache.commons.logging.LogFactory;
*/
public class JacksonMapper extends JsonMapper {
private static Log logger = LogFactory.getLog(JsonMapper.class);
private final ObjectMapper mapper;
public JacksonMapper(ObjectMapper mapper) {
@@ -62,7 +58,7 @@ public class JacksonMapper extends JsonMapper {
}
}
catch (Exception e) {
logger.warn("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e);
throw new IllegalStateException("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e);
}
return convertedValue;
}