diff --git a/README.adoc b/README.adoc index 356b3a48f..3d99877fe 100644 --- a/README.adoc +++ b/README.adoc @@ -100,31 +100,6 @@ HELLOWORLD (You can use `^Q^J` in a terminal to insert a new line in a literal string like that.) -== Building and Running a Function - -The sample `@SpringBootApplication` above has a function that can be -decorated at runtime by Spring Cloud Function to be an HTTP endpoint, -or a Stream processor, for instance with RabbitMQ, Apache Kafka or -JMS. - -The `@Beans` can be `Function`, `Consumer` or `Supplier` (all from -`java.util`), and their parametric types can be String or POJO. - -Functions can also be of `Flux` or `Flux` and Spring -Cloud Function takes care of converting the data to and from the -desired types, as long as it comes in as plain text or (in the case of -the POJO) JSON. There is also support for `Message` where the -message headers are copied from the incoming event, depending on the -adapter. The web adapter also supports conversion from form-encoded -data to a `Map`, and if you are using the function with Spring Cloud -Stream then all the conversion and coercion features for message -payloads will be applicable as well. - -Functions can be grouped together in a single application, or deployed -one-per-jar. It's up to the developer to choose. An app with multiple -functions can be deployed multiple times in different "personalities", -exposing different functions over different physical transports. - == Building :jdkversion: 1.7 diff --git a/docs/src/main/asciidoc/adapters/aws-intro.adoc b/docs/src/main/asciidoc/adapters/aws-intro.adoc index 28e567427..a5e308037 100644 --- a/docs/src/main/asciidoc/adapters/aws-intro.adoc +++ b/docs/src/main/asciidoc/adapters/aws-intro.adoc @@ -4,10 +4,77 @@ The https://aws.amazon.com/[AWS] adapter takes a Spring Cloud Function app and converts it to a form that can run in AWS Lambda. +The details of how to get stared with AWS Lambda is out of scope of this document, so the expectation is that user has some familiarity with +AWS and AWS Lambda and wants to learn what additional value spring provides. + +==== Getting Started + +One of the goals of Spring Cloud Function framework is to provide necessary infrastructure elements to enable a _simple function application_ +to interact in a certain way in a particular environment. +A simple function application (in context or Spring) is an application that contains beans of type Supplier, Function or Consumer. +So, with AWS it means that a simple function bean should somehow be recognised and executed in AWS Lambda environment. + +Let’s look at the example: + +[source, java] +---- +@SpringBootApplication +public class FunctionConfiguration { + + public static void main(String[] args) { + SpringApplication.run(FunctionConfiguration.class, args); + } + + @Bean + public Function uppercase() { + return value -> value.toUpperCase(); + } +} +---- + +It shows a complete Spring Boot application with a function bean defined in it. What’s interesting is that on the surface this is just +another boot app, but in the context of AWS Adapter it is also a perfectly valid AWS Lambda application. No other code or configuration +is required. All you need to do is package it and deploy it, so let’s look how we can do that. + +To make things simpler we’ve provided a sample project ready to be built and deployed and you can access it +https://github.com/spring-cloud/spring-cloud-function/tree/master/spring-cloud-function-samples/function-sample-aws[here]. + +You simply execute `./mvnw clean package` to generate JAR file. All the necessary maven plugins have already been setup to generate +appropriate AWS deployable JAR file. (You can read more details about JAR layout in <>). + +Then you have to upload the JAR file (via AWS dashboard or AWS CLI) to AWS. + +When ask about _handler_ you specify `org.springframework.cloud.function.adapter.aws.FunctionInvoker::handleRequest` which is a generic request handler. + +image::{github-raw}/docs/src/main/asciidoc/images/AWS-deploy.png[width=800,scaledwidth="75%",align="center"] + +That is all. Save and execute the function with some sample data which for this function is expected to be a +String which function will uppercase and return back. + +While `org.springframework.cloud.function.adapter.aws.FunctionInvoker` is a general purpose AWS's `RequestHandler` implementation aimed at completely +isolating you from the specifics of AWS Lambda API, for some cases you may want to specify which specific AWS's `RequestHandler` you want +to use. The next section will explain you how you can accomplish just that. + + +==== AWS Request Handlers + +The adapter has a couple of generic request handlers that you can use. The most generic is (and the one we used in the Getting Started section) +is `org.springframework.cloud.function.adapter.aws.FunctionInvoke` which is the implementation of AWS's `RequestStreamHandler`. +User doesn't need to do anything other then specify it as 'handler' on AWS dashborad when deplioyimng function. +It will handle most of the case including Kinesis, streaming etc. . + + + +The most generic is +`SpringBootStreamHandler`, which uses a Jackson `ObjectMapper` provided by Spring Boot to serialize and deserialize the objects +in the function. There is also a `SpringBootRequestHandler` which you can extend, and provide the input and output types as type +parameters (enabling AWS to inspect the class and do the JSON conversions itself). + +If your app has more than one `@Bean` of type `Function` etc. then you can choose the one to use by configuring `function.name` +(e.g. as `FUNCTION_NAME` environment variable in AWS). The functions are extracted from the Spring Cloud `FunctionCatalog` +(searching first for `Function` then `Consumer` and finally `Supplier`). -The adapter has a couple of generic request handlers that you can use. The most generic is `SpringBootStreamHandler`, which uses a Jackson `ObjectMapper` provided by Spring Boot to serialize and deserialize the objects in the function. There is also a `SpringBootRequestHandler` which you can extend, and provide the input and output types as type parameters (enabling AWS to inspect the class and do the JSON conversions itself). -If your app has more than one `@Bean` of type `Function` etc. then you can choose the one to use by configuring `function.name` (e.g. as `FUNCTION_NAME` environment variable in AWS). The functions are extracted from the Spring Cloud `FunctionCatalog` (searching first for `Function` then `Consumer` and finally `Supplier`). ==== Notes on JAR Layout diff --git a/docs/src/main/asciidoc/images/AWS-deploy.png b/docs/src/main/asciidoc/images/AWS-deploy.png new file mode 100644 index 000000000..c840e1396 Binary files /dev/null and b/docs/src/main/asciidoc/images/AWS-deploy.png differ diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java new file mode 100644 index 000000000..5579c4aa8 --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -0,0 +1,143 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.utils.FunctionClassUtils; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.Environment; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.StreamUtils; +import org.springframework.util.StringUtils; + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + * + * see + * https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-output-format + */ +public class FunctionInvoker implements RequestStreamHandler { + + private static Log logger = LogFactory.getLog(FunctionInvoker.class); + + private ObjectMapper mapper; + + private boolean started; + + private Function, Message> function; + + @Override + public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { + if (!this.started) { + this.start(); + } + + Message requestMessage = this.generateMessage(input, context); + + Message responseMessage = this.function.apply(requestMessage); + + byte[] responseBytes = responseMessage.getPayload(); + if (requestMessage.getHeaders().containsKey("httpMethod")) { + Map response = new HashMap(); + response.put("isBase64Encoded", false); + response.put("statusCode", 200); + response.put("body", new String(responseMessage.getPayload(), StandardCharsets.UTF_8)); + response.put("headers", Collections.singletonMap("foo", "bar")); + + responseBytes = mapper.writeValueAsBytes(response); + } + + StreamUtils.copy(responseBytes, output); + } + + private void start() { + ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass()); + Environment environment = context.getEnvironment(); + String functionName = environment.getProperty("spring.cloud.function.definition"); + FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class); + this.mapper = context.getBean(ObjectMapper.class); + this.configureObjectMapper(); + + if (logger.isInfoEnabled()) { + logger.info("Locating function: '" + functionName + "'"); + } + + this.function = functionCatalog.lookup(functionName, "application/json"); + Assert.notNull(this.function, "Failed to lookup function " + functionName); + + if (!StringUtils.hasText(functionName)) { + FunctionInspector inspector = context.getBean(FunctionInspector.class); + functionName = inspector.getRegistration(this.function).getNames().toString(); + } + + if (logger.isInfoEnabled()) { + logger.info("Located function: '" + functionName + "'"); + } + this.started = true; + } + + private void configureObjectMapper() { + SimpleModule module = new SimpleModule(); + module.addDeserializer(Date.class, new JsonDeserializer() { + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(jsonParser.getValueAsLong()); + return calendar.getTime(); + } + }); + mapper.registerModule(module); + mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); + } + + private Message generateMessage(InputStream input, Context context) throws IOException { + byte[] payload = StreamUtils.copyToByteArray(input); + + Message message = MessageBuilder.withPayload(payload).setHeader("aws-context", context).build(); + + return message; + } +} diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java new file mode 100644 index 000000000..448c14aec --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -0,0 +1,154 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * + * @author Oleg Zhurakousky + * + */ +public class FunctionInvokerTests { + + String sampleEvent = "{" + + " \"Records\": [" + + " {" + + " \"kinesis\": {" + + " \"kinesisSchemaVersion\": \"1.0\"," + + " \"partitionKey\": \"1\"," + + " \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\"," + + " \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\"," + + " \"approximateArrivalTimestamp\": 1545084650.987" + + " }," + + " \"eventSource\": \"aws:kinesis\"," + + " \"eventVersion\": \"1.0\"," + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692538361571095921575989136588898\"," + + " \"eventName\": \"aws:kinesis:record\"," + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\"," + + " \"awsRegion\": \"us-east-2\"," + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"" + + " }," + + " {" + + " \"kinesis\": {" + + " \"kinesisSchemaVersion\": \"1.0\"," + + " \"partitionKey\": \"1\"," + + " \"sequenceNumber\": \"49590338271490256608559692540925702759324208523137515618\"," + + " \"data\": \"VGhpcyBpcyBvbmx5IGEgdGVzdC4=\"," + + " \"approximateArrivalTimestamp\": 1545084711.166" + + " }," + + " \"eventSource\": \"aws:kinesis\"," + + " \"eventVersion\": \"1.0\"," + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692540925702759324208523137515618\"," + + " \"eventName\": \"aws:kinesis:record\"," + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\"," + + " \"awsRegion\": \"us-east-2\"," + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"" + + " }" + + " ]" + + "}"; + + @Test + public void testKinesisStringMessageEvent() throws Exception { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoStringMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).isEqualTo(this.sampleEvent); + } + + @Test + public void testKinesisStringEvent() throws Exception { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoStringMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + System.out.println(result); + assertThat(result).isEqualTo(this.sampleEvent); + } + + + @Test + public void testKinesisEvent() throws Exception { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoKinesisEvent"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + System.out.println(result); + assertThat(result).contains("\"sequenceNumber\":\"49590338271490256608559692538361571095921575989136588898\""); + } + + + + @EnableAutoConfiguration + @Configuration + public static class KinesisConfiguration { + + @Bean + public Function, Message> echoStringMessage() { + return v -> { + System.out.println("Received: " + v); + return v; + }; + } + + @Bean + public Function echoString() { + return v -> { + System.out.println("Received: " + v); + return v; + }; + } + + @Bean + public Function echoKinesisEvent() { + return v -> { + System.out.println("Received: " + v); + return v; + }; + } + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ApplicationJsonMessageMarshallingConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ApplicationJsonMessageMarshallingConverter.java new file mode 100644 index 000000000..6390a3e40 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ApplicationJsonMessageMarshallingConverter.java @@ -0,0 +1,166 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.core.MethodParameter; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConversionException; + +/** + * Variation of {@link MappingJackson2MessageConverter} to support marshalling and + * unmarshalling of Messages's payload from 'String' or 'byte[]' to an instance of a + * 'targetClass' and and back to 'byte[]'. + * + * @author Oleg Zhurakousky + * @author Gary Russell + * @since 2.0 + */ +class ApplicationJsonMessageMarshallingConverter extends MappingJackson2MessageConverter { + + private final Map typeCache = new ConcurrentHashMap<>(); + + ApplicationJsonMessageMarshallingConverter(@Nullable ObjectMapper objectMapper) { + if (objectMapper != null) { + this.setObjectMapper(objectMapper); + } + } + + @Override + protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, + @Nullable Object conversionHint) { + if (payload instanceof byte[]) { + return payload; + } + else if (payload instanceof String) { + return ((String) payload).getBytes(StandardCharsets.UTF_8); + } + else { + return super.convertToInternal(payload, headers, conversionHint); + } + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object hint) { + Object conversionHint = hint; + Object result = null; + if (conversionHint instanceof MethodParameter) { + Class conversionHintType = ((MethodParameter) conversionHint) + .getParameterType(); + if (Message.class.isAssignableFrom(conversionHintType)) { + /* + * Ensures that super won't attempt to create Message as a result of + * conversion and stays at payload conversion only. The Message will + * eventually be created in + * MessageMethodArgumentResolver.resolveArgument(..) + */ + conversionHint = null; + } + else if (((MethodParameter) conversionHint) + .getGenericParameterType() instanceof ParameterizedType) { + ParameterizedTypeReference forType = ParameterizedTypeReference + .forType(((MethodParameter) conversionHint) + .getGenericParameterType()); + result = convertParameterizedType(message, forType.getType()); + } + } + else if (conversionHint instanceof ParameterizedTypeReference) { + result = convertParameterizedType(message, ((ParameterizedTypeReference) conversionHint).getType()); + } + else if (conversionHint instanceof ParameterizedType) { + result = convertParameterizedType(message, (Type) conversionHint); + } + + if (result == null) { + if (message.getPayload() instanceof byte[] + && targetClass.isAssignableFrom(String.class)) { + result = new String((byte[]) message.getPayload(), + StandardCharsets.UTF_8); + } + else { + result = super.convertFromInternal(message, targetClass, conversionHint); + } + } + + return result; + } + + private Object convertParameterizedType(Message message, Type conversionHint) { + ObjectMapper objectMapper = this.getObjectMapper(); + Object payload = message.getPayload(); + try { + JavaType type = this.typeCache.get(conversionHint); + if (type == null) { + conversionHint = FunctionTypeUtils.isMessage(conversionHint) + ? FunctionTypeUtils.getImmediateGenericType(conversionHint, 0) + : conversionHint; + type = objectMapper.getTypeFactory() + .constructType(conversionHint); + this.typeCache.put(conversionHint, type); + } + if (payload instanceof byte[]) { + return objectMapper.readValue((byte[]) payload, type); + } + else if (payload instanceof String) { + return objectMapper.readValue((String) payload, type); + } + else { + final JavaType typeToUse = type; + if (payload instanceof Collection) { + Collection collection = (Collection) ((Collection) payload).stream() + .map(value -> { + try { + if (value instanceof byte[]) { + return objectMapper.readValue((byte[]) value, typeToUse.getContentType()); + } + else if (value instanceof String) { + return objectMapper.readValue((String) value, typeToUse.getContentType()); + } + } + catch (Exception e) { + logger.error("Failed to convert payload " + value, e); + } + return null; + }).collect(Collectors.toList()); + + return collection; + } + return null; + } + } + catch (IOException e) { + throw new MessageConversionException("Cannot parse payload ", e); + } + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 92130c7be..0a366f932 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -47,9 +47,9 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; +import org.springframework.lang.Nullable; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.util.CollectionUtils; @@ -70,7 +70,7 @@ public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @Bean - public FunctionRegistry functionCatalog(Map messageConverters) { + public FunctionRegistry functionCatalog(Map messageConverters, @Nullable ObjectMapper objectMapper) { ConversionService conversionService = new DefaultConversionService(); CompositeMessageConverter messageConverter = null; List mcList = new ArrayList<>(); @@ -88,7 +88,10 @@ public class ContextFunctionCatalogAutoConfiguration { } } if (addDefaultConverters) { - mcList.add(new MappingJackson2MessageConverter()); + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + } + mcList.add(new ApplicationJsonMessageMarshallingConverter(objectMapper)); mcList.add(new ByteArrayMessageConverter()); mcList.add(new StringMessageConverter()); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryMultiInOutTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryMultiInOutTests.java index 8e4b7c4b3..6caae2513 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryMultiInOutTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryMultiInOutTests.java @@ -79,6 +79,25 @@ public class BeanFactoryAwareFunctionRegistryMultiInOutTests { System.out.println(result); } + @Test + public void testMultiInputWithPojoConversion() { + FunctionCatalog catalog = this.configureCatalog(); + Function, Flux>, Flux> multiInputFunction = + catalog.lookup("thomas", "application/json"); + CartEvent carEvent = new CartEvent(); + carEvent.setCarEvent("carEvent"); + Flux carEventStream = Flux.just(carEvent); + + CheckoutEvent checkoutEvent = new CheckoutEvent(); + checkoutEvent.setCheckoutEvent("checkoutEvent"); + Flux checkoutEventStream = Flux.just(checkoutEvent); + + Tuple2, Flux> streams = Tuples.of(carEventStream, checkoutEventStream); + + List result = multiInputFunction.apply(streams).collectList().block(); + System.out.println(result); + } + @SuppressWarnings("unused") @Test @Ignore @@ -380,6 +399,56 @@ public class BeanFactoryAwareFunctionRegistryMultiInOutTests { return new Flux[] { repeated, sum }; }; } + + @Bean + public Function, Flux>, Flux> thomas() { + return tuple -> { + Flux cartEventStream = tuple.getT1(); + Flux checkoutEventStream = tuple.getT2(); + + return Flux.zip(cartEventStream, checkoutEventStream, (cartEvent, checkoutEvent) -> { + OrderEvent oe = new OrderEvent(); + oe.setOrderEvent(cartEvent.toString() + "- " + checkoutEvent.toString()); + return oe; + }); + }; + } + } + + public static class CartEvent { + private String carEvent; + + public String getCarEvent() { + return carEvent; + } + + public void setCarEvent(String carEvent) { + this.carEvent = carEvent; + } + } + + public static class CheckoutEvent { + private String checkoutEvent; + + public String getCheckoutEvent() { + return checkoutEvent; + } + + public void setCheckoutEvent(String checkoutEvent) { + this.checkoutEvent = checkoutEvent; + } + } + + public static class OrderEvent { + private String orderEvent; + + public String getOrderEvent() { + return orderEvent; + } + + public void setOrderEvent(String orderEvent) { + this.orderEvent = orderEvent; + } } public static class Person { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java index ad923c3cd..049e570d1 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java @@ -66,7 +66,7 @@ public class BeanFactoryAwarePojoFunctionRegistryTests { assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE"); Function, Message> f2messageReturned = catalog.lookup("myFunction", "application/json"); - assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); + assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("MESSAGE"); Function, Flux> f3 = catalog.lookup("myFunction"); assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO"); @@ -89,7 +89,7 @@ public class BeanFactoryAwarePojoFunctionRegistryTests { assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE"); Function, Message> f2messageReturned = catalog.lookup("myFunctionLike", "application/json"); - assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); + assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("MESSAGE"); Function, Flux> f3 = catalog.lookup("myFunctionLike"); assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO"); diff --git a/spring-cloud-function-samples/function-sample-aws/pom.xml b/spring-cloud-function-samples/function-sample-aws/pom.xml index e7ed58deb..c4c21f111 100644 --- a/spring-cloud-function-samples/function-sample-aws/pom.xml +++ b/spring-cloud-function-samples/function-sample-aws/pom.xml @@ -26,7 +26,6 @@ 1.0.17.RELEASE 2.0.2 3.0.1.BUILD-SNAPSHOT - example.Config @@ -34,16 +33,18 @@ org.springframework.cloud spring-cloud-function-adapter-aws - - org.springframework.cloud - spring-cloud-starter-function-webflux - provided - + + + + + + + com.amazonaws aws-lambda-java-events ${aws-lambda-events.version} - provided + com.amazonaws diff --git a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Config.java b/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Config.java deleted file mode 100644 index aa3b6868c..000000000 --- a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Config.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package example; - -import java.util.function.Function; - -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.function.context.FunctionRegistration; -import org.springframework.cloud.function.context.FunctionType; -import org.springframework.cloud.function.context.FunctionalSpringApplication; -import org.springframework.context.ApplicationContextInitializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.support.GenericApplicationContext; - -// @checkstyle:off -@SpringBootApplication -@EnableConfigurationProperties(Properties.class) -public class Config implements ApplicationContextInitializer { - - private Properties props; - - public Config() { - } - - public Config(Properties props) { - this.props = props; - } - - public static void main(String[] args) throws Exception { - FunctionalSpringApplication.run(Config.class, args); - } - - @Bean - public Function function() { - return value -> new Bar( - value.uppercase() + (this.props.getFoo() != null ? "-" + this.props - .getFoo() : "")); - } - - @Override - public void initialize(GenericApplicationContext context) { - Properties properties = new Properties(); - this.props = properties; - context.registerBean(Properties.class, () -> properties); - context.registerBean("function", FunctionRegistration.class, - () -> new FunctionRegistration>(function()) - .type(FunctionType.from(Foo.class).to(Bar.class).getType())); - } - -} -// @checkstyle:on - -class Foo { - - private String value; - - Foo() { - } - - Foo(String value) { - this.value = value; - } - - public String lowercase() { - return this.value.toLowerCase(); - } - - public String uppercase() { - return this.value.toUpperCase(); - } - - public String getValue() { - return this.value; - } - - public void setValue(String value) { - this.value = value; - } - -} - -class Bar { - - private String value; - - Bar() { - } - - Bar(String value) { - this.value = value; - } - - public String getValue() { - return this.value; - } - - public void setValue(String value) { - this.value = value; - } - -} diff --git a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/FunctionConfiguration.java b/spring-cloud-function-samples/function-sample-aws/src/main/java/example/FunctionConfiguration.java new file mode 100644 index 000000000..6bc7dc9d1 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws/src/main/java/example/FunctionConfiguration.java @@ -0,0 +1,24 @@ +package example; + +import java.util.function.Function; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class FunctionConfiguration { + + /* + * You need this main method or explicit example.FunjctionConfiguration + * in the POM to ensure boot plug-in makes the correct entry + */ + public static void main(String[] args) { + SpringApplication.run(FunctionConfiguration.class, args); + } + + @Bean + public Function uppercase() { + return value -> value.toUpperCase(); + } +} \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Handler.java b/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Handler.java deleted file mode 100644 index cabcf7dc1..000000000 --- a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Handler.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package example; - -import org.springframework.cloud.function.adapter.aws.SpringBootRequestHandler; - -/** - * @author Dave Syer - * - */ -public class Handler extends SpringBootRequestHandler { - -} diff --git a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Properties.java b/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Properties.java deleted file mode 100644 index 9c6ce2b5b..000000000 --- a/spring-cloud-function-samples/function-sample-aws/src/main/java/example/Properties.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package example; - -import org.springframework.boot.context.properties.ConfigurationProperties; - - -// @checkstyle:off -@ConfigurationProperties("app") -public class Properties { - - public String foo; - - public String getFoo() { - return this.foo; - } - - public void setFoo(String foo) { - this.foo = foo; - } - -} - -// @checkstyle:on diff --git a/spring-cloud-function-samples/function-sample-aws/src/test/java/example/MapTests.java b/spring-cloud-function-samples/function-sample-aws/src/test/java/example/MapTests.java index df2835371..440c6b565 100644 --- a/spring-cloud-function-samples/function-sample-aws/src/test/java/example/MapTests.java +++ b/spring-cloud-function-samples/function-sample-aws/src/test/java/example/MapTests.java @@ -18,10 +18,6 @@ package example; import org.junit.Test; -import org.springframework.cloud.function.adapter.aws.SpringBootRequestHandler; - -import static org.assertj.core.api.Assertions.assertThat; - /** * @author Dave Syer * @@ -30,16 +26,6 @@ public class MapTests { @Test public void test() { - Bar result = new Config(new Properties()).function().apply(new Foo("foo")); - assertThat(result.getValue()).isEqualTo("FOO"); - } - @Test - public void start() throws Exception { - SpringBootRequestHandler handler = new SpringBootRequestHandler<>( - Config.class); - handler.handleRequest(new Foo("foo"), null); - handler.close(); } - }