diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index f53e97d92..a5c2c9512 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -210,6 +210,140 @@ IMPORTANT: IMPORTANT: At the moment, function arity is *only* supported for reac where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event. +=== Type conversion (Content-Type negotiation) + +Content-Type negotiation is one of the core features of Spring Cloud Function as it allows to not only transform the incoming data to the types declared +by the function signature, but to do the same transformation during function composition making otherwise un-composable (by type) functions composable. + +To better understand the mechanics and the necessity behind content-type negotiation, we take a look at a very simple use case by +using the following function as an example: + +[source, java] +---- +@Bean +public Function personFunction {..} +---- + +The function shown in the preceding example expects a `Person` object as an argument and produces a String type as an output. If such function is +invoked with the type `Person`, than all works fine. But typically function plays a role of a handler for the incoming data which most often comes +in the raw format such as `byte[]`, `JSON String` etc. In order for the framework to succeed in passing the incoming data as an argument to +this function, it has to somehow transform the incoming data to a `Person` type. + +Spring Cloud Function relies on two native to Spring mechanisms to accomplish that. + +. _MessageConverter_ - to convert from incoming Message data to a type declared by the function. +. _ConversionService_ - to convert from incoming non-Message data to a type declared by the function. + +This means that depending on the type of the raw data (Message or non-Message) Spring Cloud Function will apply one or the other mechanisms. + +For most cases when dealing with functions that are invoked as part of some other request (e.g., HTTP, Messaging etc) the framework relies on `MessageConverters`, +since such requests already converted to Spring `Message`. In other words, the framework locates and applies the appropriate `MessageConverter`. +To accomplish that, the framework needs some instructions from the user. One of these instructions is already provided by the signature of the function +itself (Person type). Consequently, in theory, that should be (and, in some cases, is) enough. However, for the majority of use cases, in order to +select the appropriate `MessageConverter`, the framework needs an additional piece of information. That missing piece is `contentType` header. + +Such header usually comes as part of the Message where it is injected by the corresponding adapter that created such Message in the first place. +For example, HTTP POST request will have its content-type HTTP header copied to `contentType` header of the Message. + +For cases when such header does not exist framework relies on the default content type as `application/json`. + + +==== Content Type versus Argument Type + +As mentioned earlier, for the framework to select the appropriate `MessageConverter`, it requires argument type and, optionally, content type information. +The logic for selecting the appropriate `MessageConverter` resides with the argument resolvers which trigger right before the invocation of the user-defined +function (which is when the actual argument type is known to the framework). +If the argument type does not match the type of the current payload, the framework delegates to the stack of the +pre-configured `MessageConverters` to see if any one of them can convert the payload. + +The combination of `contentType` and argument type is the mechanism by which framework determines if message can be converted to a target type by locating +the appropriate `MessageConverter`. +If no appropriate `MessageConverter` is found, an exception is thrown, which you can handle by adding a custom `MessageConverter` +(see `<>`). + +NOTE: Do not expect `Message` to be converted into some other type based only on the `contentType`. +Remember that the `contentType` is complementary to the target type. +It is a hint, which `MessageConverter` may or may not take into consideration. + +==== Message Converters + +`MessageConverters` define two methods: + +[source, java] +---- +Object fromMessage(Message message, Class targetClass); + +Message toMessage(Object payload, @Nullable MessageHeaders headers); +---- + +It is important to understand the contract of these methods and their usage, specifically in the context of Spring Cloud Stream. + +The `fromMessage` method converts an incoming `Message` to an argument type. +The payload of the `Message` could be any type, and it is +up to the actual implementation of the `MessageConverter` to support multiple types. + + +==== Provided MessageConverters + +As mentioned earlier, the framework already provides a stack of `MessageConverters` to handle most common use cases. +The following list describes the provided `MessageConverters`, in order of precedence (the first `MessageConverter` that works is used): + +. `JsonMessageConverter`: Supports conversion of the payload of the `Message` to/from POJO for cases when `contentType` is `application/json` using Jackson or Gson libraries (DEFAULT). +. `ByteArrayMessageConverter`: Supports conversion of the payload of the `Message` from `byte[]` to `byte[]` for cases when `contentType` is `application/octet-stream`. It is essentially a pass through and exists primarily for backward compatibility. +. `StringMessageConverter`: Supports conversion of any type to a `String` when `contentType` is `text/plain`. + +When no appropriate converter is found, the framework throws an exception. When that happens, you should check your code and configuration and ensure you did +not miss anything (that is, ensure that you provided a `contentType` by using a binding or a header). +However, most likely, you found some uncommon case (such as a custom `contentType` perhaps) and the current stack of provided `MessageConverters` +does not know how to convert. If that is the case, you can add custom `MessageConverter`. See <>. + +[[user-defined-message-converters]] +==== User-defined Message Converters + +Spring Cloud Function exposes a mechanism to define and register additional `MessageConverters`. +To use it, implement `org.springframework.messaging.converter.MessageConverter`, configure it as a `@Bean`. +It is then appended to the existing stack of `MessageConverter`s. + +NOTE: It is important to understand that custom `MessageConverter` implementations are added to the head of the existing stack. +Consequently, custom `MessageConverter` implementations take precedence over the existing ones, which lets you override as well as add to the existing converters. + +The following example shows how to create a message converter bean to support a new content type called `application/bar`: + +[source,java] +---- +@SpringBootApplication +public static class SinkApplication { + + ... + + @Bean + public MessageConverter customMessageConverter() { + return new MyCustomMessageConverter(); + } +} + +public class MyCustomMessageConverter extends AbstractMessageConverter { + + public MyCustomMessageConverter() { + super(new MimeType("application", "bar")); + } + + @Override + protected boolean supports(Class clazz) { + return (Bar.class.equals(clazz)); + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + Object payload = message.getPayload(); + return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); + } +} +---- + + + + === Kotlin Lambda support We also provide support for Kotlin lambdas (since v2.0). diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index c5596c7dc..3a2b3376b 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -31,10 +31,6 @@ import java.util.Map; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.S3Event; -import com.amazonaws.services.lambda.runtime.events.SNSEvent; -import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; @@ -165,7 +161,7 @@ public class FunctionInvoker implements RequestStreamHandler { @SuppressWarnings({ "unchecked", "rawtypes" }) private Message generateMessage(InputStream input, Context context) throws IOException { - byte[] payload = StreamUtils.copyToByteArray(input); + final byte[] payload = StreamUtils.copyToByteArray(input); if (logger.isInfoEnabled()) { logger.info("Incoming JSON for ApiGateway Event: " + new String(payload)); @@ -183,34 +179,8 @@ public class FunctionInvoker implements RequestStreamHandler { if (requestMap.containsKey("Records")) { List> records = (List>) requestMap.get("Records"); Assert.notEmpty(records, "Incoming event has no records: " + requestMap); - if (this.isKinesisEvent(records.get(0))) { - logger.info("Incoming request is Kinesis Event"); - Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, - "Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event"); - Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); - messageBuilder = MessageBuilder.withPayload(event); - } - else if (this.isS3Event(records.get(0))) { - logger.info("Incoming request is S3 Event"); - Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class) inputType) || mapInputType, - "Only S3Event or Map type is supported as input type for functions that accept S3 Event"); - Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class); - messageBuilder = MessageBuilder.withPayload(event); - } - else if (this.isSNSEvent(records.get(0))) { - logger.info("Incoming request is SNS Event"); - Assert.isTrue(inputType instanceof Class && SNSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, - "Only SNSEvent or Map type is supported as input type for functions that accept SNSEvent"); - Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SNSEvent.class); - messageBuilder = MessageBuilder.withPayload(event); - } - else { - logger.info("Incoming request is SQS Event"); - Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, - "Only SQSEvent or Map type is supported as input type for functions that accept SQS Event"); - Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SQSEvent.class); - messageBuilder = MessageBuilder.withPayload(event); - } + this.logEvent(records); + messageBuilder = MessageBuilder.withPayload(payload); } else if (requestMap.containsKey("httpMethod")) { // API Gateway logger.info("Incoming request is API Gateway"); @@ -219,14 +189,12 @@ public class FunctionInvoker implements RequestStreamHandler { messageBuilder = MessageBuilder.withPayload(gatewayEvent); } else if (mapInputType) { - messageBuilder = MessageBuilder.withPayload(requestMap) - .setHeader("httpMethod", requestMap.get("httpMethod")); + messageBuilder = MessageBuilder.withPayload(requestMap).setHeader("httpMethod", requestMap.get("httpMethod")); } else { Object body = requestMap.remove("body"); body = body instanceof String ? ("\"" + body + "\"").getBytes(StandardCharsets.UTF_8) : mapper.writeValueAsBytes(body); - messageBuilder = MessageBuilder.withPayload(body) - .copyHeaders(requestMap); + messageBuilder = MessageBuilder.withPayload(body).copyHeaders(requestMap); } } } @@ -236,6 +204,21 @@ public class FunctionInvoker implements RequestStreamHandler { return messageBuilder.setHeader("aws-context", context).build(); } + private void logEvent(List> records) { + if (this.isKinesisEvent(records.get(0))) { + logger.info("Incoming request is Kinesis Event"); + } + else if (this.isS3Event(records.get(0))) { + logger.info("Incoming request is S3 Event"); + } + else if (this.isSNSEvent(records.get(0))) { + logger.info("Incoming request is SNS Event"); + } + else { + logger.info("Incoming request is SQS Event"); + } + } + private boolean isSNSEvent(Map record) { return record.containsKey("Sns"); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index 79b7b5630..25d43c714 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -29,13 +29,14 @@ import com.amazonaws.services.lambda.runtime.events.S3Event; import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.util.MimeType; import static org.assertj.core.api.Assertions.assertThat; @@ -359,15 +360,15 @@ public class FunctionInvokerTests { @Test public void testKinesisStringEvent() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoString"); - FunctionInvoker invoker = new FunctionInvoker(); + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); - InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - }); + InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("kinesisSchemaVersion"); } @Test @@ -414,15 +415,15 @@ public class FunctionInvokerTests { @Test public void testSQSStringEvent() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoString"); - FunctionInvoker invoker = new FunctionInvoker(); + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); - InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - }); + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result.length()).isEqualTo(14); // some additional JSON formatting } @Test @@ -469,15 +470,15 @@ public class FunctionInvokerTests { @Test public void testSNSStringEvent() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoString"); - FunctionInvoker invoker = new FunctionInvoker(); + System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); - InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - }); + InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sns"); } @Test @@ -522,18 +523,17 @@ public class FunctionInvokerTests { assertThat(result).contains("arn:aws:sns"); } - @Test public void testS3StringEvent() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoString"); - FunctionInvoker invoker = new FunctionInvoker(); + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); - InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - }); + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("s3SchemaVersion"); } @Test @@ -693,8 +693,11 @@ public class FunctionInvokerTests { @Configuration public static class SQSConfiguration { @Bean - public Function echoString() { - return v -> v; + public Function echoString() { + return v -> { + System.out.println("Echo: " + v); + return v.toString(); + }; } @Bean @@ -720,6 +723,32 @@ public class FunctionInvokerTests { return v.toString(); }; } + + @Bean + public MyCustomMessageConverter messageConverter() { + return new MyCustomMessageConverter(); + } + } + + public static class MyCustomMessageConverter extends AbstractMessageConverter { + + public MyCustomMessageConverter() { + super(new MimeType("*", "*")); + } + + @Override + protected boolean supports(Class clazz) { + return (Person.class.equals(clazz)); + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + Object payload = message.getPayload(); + String v = payload instanceof String ? (String) payload : new String((byte[]) payload); + Person person = new Person(); + person.setName(v.substring(0, 10)); + return person; + } } @EnableAutoConfiguration @@ -727,7 +756,10 @@ public class FunctionInvokerTests { public static class SNSConfiguration { @Bean public Function echoString() { - return v -> v; + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; } @Bean @@ -834,5 +866,10 @@ public class FunctionInvokerTests { public void setName(String name) { this.name = name; } + + @Override + public String toString() { + return this.name; + } } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index 9a34d284c..9bad95929 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -77,8 +77,13 @@ public class JsonMessageConverter extends AbstractMessageConverter { } Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; - Object result = jsonMapper.fromJson(message.getPayload(), convertToType); - return result; + try { + return this.jsonMapper.fromJson(message.getPayload(), convertToType); + } + catch (Exception e) { + // ignore + } + return null; } @Override diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java index fde6f87b0..e0120a5b7 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java @@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * @author Dave Syer @@ -32,8 +30,6 @@ import org.apache.commons.logging.LogFactory; */ public class JacksonMapper extends JsonMapper { - private static Log logger = LogFactory.getLog(JsonMapper.class); - private final ObjectMapper mapper; public JacksonMapper(ObjectMapper mapper) { @@ -62,7 +58,7 @@ public class JacksonMapper extends JsonMapper { } } catch (Exception e) { - logger.warn("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e); + throw new IllegalStateException("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e); } return convertedValue; }