diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java index 2a756d50a..e3995abfd 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,22 +18,39 @@ package org.springframework.cloud.function.adapter.aws; import java.util.function.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.cloud.function.web.source.DestinationResolver; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +/** + * Implementation of {@link DestinationResolver}for AWS Lambda which resolves destination + * from `lambda-runtime-aws-request-id` message header. + * + * @author Dave Syer + * @author Oleg Zhurakousky + * + */ public class LambdaDestinationResolver implements DestinationResolver { + private static Log logger = LogFactory.getLog(LambdaDestinationResolver.class); + @Override public String destination(Supplier supplier, String name, Object value) { + String destination = "unknown"; if (value instanceof Message) { Message message = (Message) value; MessageHeaders headers = message.getHeaders(); if (headers.containsKey("lambda-runtime-aws-request-id")) { - return (String) headers.get("lambda-runtime-aws-request-id"); + destination = (String) headers.get("lambda-runtime-aws-request-id"); } } - return "unknown"; + if (logger.isDebugEnabled()) { + logger.debug("Lambda destination resolved to: " + destination); + } + return destination; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index fbbd7b9b4..b23967541 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -319,7 +319,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect registrationsByFunction.putIfAbsent(function, registration); registrationsByName.putIfAbsent(name, registration); - function = new FunctionInvocationWrapper(function, currentFunctionType, name, acceptedOutputTypes); + function = new FunctionInvocationWrapper(function, currentFunctionType, name, names.length > 1 ? new String[] {} : acceptedOutputTypes); if (originFunctionType == null) { originFunctionType = currentFunctionType; @@ -447,6 +447,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.headersField.setAccessible(true); } + public String getFunctionDefinition() { + return this.functionDefinition; + } + @Override public void accept(Object input) { this.doApply(input, true, null); @@ -505,6 +509,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect return target; } + @Override + public String toString() { + return "definition: " + this.functionDefinition + "; type: " + this.functionType; + } + @SuppressWarnings({"rawtypes", "unchecked"}) private Object invokeFunction(Object input) { Object invocationResult = null; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/GsonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/GsonMapper.java index 58a3ce12c..47c4a185e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/GsonMapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/GsonMapper.java @@ -27,7 +27,7 @@ import com.google.gson.JsonElement; * @author Dave Syer * @author Oleg Zhurakousky */ -public class GsonMapper implements JsonMapper { +public class GsonMapper extends JsonMapper { private final Gson gson; @@ -65,7 +65,11 @@ public class GsonMapper implements JsonMapper { @Override public byte[] toJson(Object value) { - return this.gson.toJson(value).getBytes(StandardCharsets.UTF_8); + byte[] jsonBytes = super.toJson(value); + if (jsonBytes == null) { + jsonBytes = this.gson.toJson(value).getBytes(StandardCharsets.UTF_8); + } + return jsonBytes; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java index 7865e08f6..fde6f87b0 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JacksonMapper.java @@ -23,12 +23,16 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @author Dave Syer * @author Oleg Zhurakousky */ -public class JacksonMapper implements JsonMapper { +public class JacksonMapper extends JsonMapper { + + private static Log logger = LogFactory.getLog(JsonMapper.class); private final ObjectMapper mapper; @@ -58,20 +62,23 @@ public class JacksonMapper implements JsonMapper { } } catch (Exception e) { - //ignore and let other converters have a chance + logger.warn("Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here", e); } return convertedValue; } @Override public byte[] toJson(Object value) { - try { - return this.mapper.writeValueAsBytes(value); + byte[] jsonBytes = super.toJson(value); + if (jsonBytes == null) { + try { + jsonBytes = this.mapper.writeValueAsBytes(value); + } + catch (Exception e) { + //ignore and let other converters have a chance + } } - catch (Exception e) { - //ignore and let other converters have a chance - } - return null; + return jsonBytes; } @Override diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java index f5a814d5a..bd65b15fd 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,35 +17,41 @@ package org.springframework.cloud.function.json; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONObject; + import org.springframework.core.ResolvableType; /** * @author Dave Syer * @author Oleg Zhurakousky */ -public interface JsonMapper { +public abstract class JsonMapper { + + private static Log logger = LogFactory.getLog(JsonMapper.class); /** - * @param type for list arguments + * @param type for list arguments * @param json JSON input * @param type type of list arguments * @return list of elements * @deprecated since v2.0 in favor of {@link #toObject(String, Type)} */ @Deprecated - default List toList(String json, Class type) { + List toList(String json, Class type) { Type actualType = (json.startsWith("[") && !List.class.isAssignableFrom(type)) - ? ResolvableType.forClassWithGenerics(ArrayList.class, (Class) type) - .getType() + ? ResolvableType.forClassWithGenerics(ArrayList.class, (Class) type).getType() : type; return toObject(json, actualType); } /** - * @param return type + * @param return type * @param json JSON input * @param type type * @return object @@ -53,24 +59,39 @@ public interface JsonMapper { * @deprecated since v3.0.4 in favor of {@link #fromJson(Object, Type)} */ @Deprecated - T toObject(String json, Type type); + abstract T toObject(String json, Type type); - T fromJson(Object json, Type type); + public abstract T fromJson(Object json, Type type); - byte[] toJson(Object value); + public byte[] toJson(Object value) { + if (value instanceof String) { + try { + new JSONObject((String) value); + if (logger.isDebugEnabled()) { + logger.debug( + "String already represents JSON. Skipping conversion in favor of 'getBytes(StandardCharsets.UTF_8'."); + } + return ((String) value).getBytes(StandardCharsets.UTF_8); + } + catch (Exception ex) { + // ignore + } + } + return null; + } /** - * @param type for list arguments + * @param type for list arguments * @param json JSON input * @param type type of list arguments * @return single object * @deprecated since v2.0 in favor of {@link #toObject(String, Type)} */ @Deprecated - default T toSingle(String json, Class type) { + T toSingle(String json, Class type) { return toObject(json, type); } - String toString(Object value); + public abstract String toString(Object value); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/utils/JsonMapperTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/utils/JsonMapperTests.java index cbf997ab9..e69c558a5 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/utils/JsonMapperTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/utils/JsonMapperTests.java @@ -56,7 +56,7 @@ public class JsonMapperTests { @Test public void vanillaArray() { String json = "[{\"value\":\"foo\"},{\"value\":\"foo\"}]"; - List list = this.mapper.toObject(json, + List list = this.mapper.fromJson(json, ResolvableType.forClassWithGenerics(List.class, Foo.class).getType()); assertThat(list).hasSize(2); assertThat(list.get(0).getValue()).isEqualTo("foo"); @@ -65,7 +65,7 @@ public class JsonMapperTests { @Test public void intArray() { - List list = this.mapper.toObject("[123,456]", + List list = this.mapper.fromJson("[123,456]", ResolvableType.forClassWithGenerics(List.class, Integer.class).getType()); assertThat(list).hasSize(2); assertThat(list.get(0)).isEqualTo(123); @@ -73,7 +73,7 @@ public class JsonMapperTests { @Test public void emptyArray() { - List list = this.mapper.toObject("[]", + List list = this.mapper.fromJson("[]", ResolvableType.forClassWithGenerics(List.class, Foo.class).getType()); assertThat(list).hasSize(0); } @@ -81,20 +81,27 @@ public class JsonMapperTests { @Test public void vanillaObject() { String json = "{\"value\":\"foo\"}"; - Foo foo = this.mapper.toObject(json, Foo.class); + Foo foo = this.mapper.fromJson(json, Foo.class); assertThat(foo.getValue()).isEqualTo("foo"); assertThat(this.mapper.toString(foo)).isEqualTo(json); } + @Test + public void stringRepresentingJson() { + String json = "{\"value\":\"foo\"}"; + byte[] bytes = this.mapper.toJson(json); + assertThat(new String(bytes)).isEqualTo(json); + } + @Test public void intValue() { - int foo = this.mapper.toObject("123", Integer.class); + int foo = this.mapper.fromJson("123", Integer.class); assertThat(foo).isEqualTo(123); } @Test public void empty() { - Foo foo = this.mapper.toObject("{}", Foo.class); + Foo foo = this.mapper.fromJson("{}", Foo.class); assertThat(foo.getValue()).isNull(); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index 08c0e777d..5a22c8725 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,8 +127,10 @@ public class RequestProcessor { public Mono> post(FunctionWrapper wrapper, ServerWebExchange exchange) { - return Mono.from(body(wrapper.handler(), exchange)) + Mono> responseEntity = Mono.from(body(wrapper.handler(), exchange)) .flatMap(body -> response(wrapper, body, false)); + + return responseEntity; } public Mono> post(FunctionWrapper wrapper, String body, @@ -149,7 +151,7 @@ public class RequestProcessor { jsonType = ResolvableType.forClassWithGenerics((Class) jsonType, (Class) itemType).getType(); } - input = this.mapper.toObject((String) input, jsonType); + input = this.mapper.fromJson((String) input, jsonType); } else { input = this.converter.convert(function, (String) input); @@ -178,7 +180,6 @@ public class RequestProcessor { private Mono> response(FunctionWrapper request, Object handler, Publisher result, Boolean single, boolean getter) { - BodyBuilder builder = ResponseEntity.ok(); if (this.inspector.isMessage(handler)) { result = Flux.from(result) @@ -389,7 +390,11 @@ public class RequestProcessor { exchange.getLogPrefix() + "0..1 [" + elementType + "]"); } Mono mono = reader.readMono(actualType, elementType, request, - response, readHints); + response, readHints).doOnNext(v -> { + if (logger.isDebugEnabled()) { + logger.debug("received: " + v); + } + }); mono = mono.onErrorResume( ex -> Mono.error(handleReadError(bodyParam, ex))); if (isBodyRequired) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 7aeaea17f..622230ab2 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java index 1890cadd5..a7c32f9bb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java @@ -24,6 +24,7 @@ import org.springframework.cloud.function.context.FunctionProperties; /** * @author Dave Syer + * @author Oleg Zhurakousky * */ @ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".web.export") @@ -149,6 +150,11 @@ public class ExporterProperties { */ private String name; + /** + * Content type to use when serializing source's output for transport (default 'application/json`). + */ + private String contentType = "application/json"; + public String getName() { return this.name; } @@ -169,6 +175,13 @@ public class ExporterProperties { return this.headers; } + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java index c219f02be..accf40651 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,6 +58,8 @@ public class SupplierExporter implements SmartLifecycle { private final String supplier; + private final String contentType; + private volatile boolean running; private volatile boolean ok = true; @@ -70,14 +72,15 @@ public class SupplierExporter implements SmartLifecycle { SupplierExporter(RequestBuilder requestBuilder, DestinationResolver destinationResolver, FunctionCatalog catalog, - WebClient client, ExporterProperties props) { + WebClient client, ExporterProperties exporterProperties) { this.requestBuilder = requestBuilder; this.destinationResolver = destinationResolver; this.catalog = catalog; this.client = client; - this.debug = props.isDebug(); - this.autoStartup = props.isAutoStartup(); - this.supplier = props.getSink().getName(); + this.debug = exporterProperties.isDebug(); + this.autoStartup = exporterProperties.isAutoStartup(); + this.supplier = exporterProperties.getSink().getName(); + this.contentType = exporterProperties.getSink().getContentType(); } @Override @@ -93,7 +96,7 @@ public class SupplierExporter implements SmartLifecycle { boolean suppliersPresent = false; for (String name : names) { - Supplier> supplier = this.catalog.lookup(Supplier.class, name); + Supplier> supplier = this.catalog.lookup(name, this.contentType); if (supplier == null) { logger.warn("No such Supplier: " + name); continue; @@ -163,8 +166,7 @@ public class SupplierExporter implements SmartLifecycle { private Flux forward(Supplier> supplier, String name) { return Flux.from(supplier.get()).flatMap(value -> { - String destination = this.destinationResolver.destination(supplier, name, - value); + String destination = this.destinationResolver.destination(supplier, name, value); if (this.debug) { logger.info("Posting to: " + destination); } @@ -178,9 +180,17 @@ public class SupplierExporter implements SmartLifecycle { Message message = (Message) value; body = message.getPayload(); } + if (this.debug) { + logger.debug("Sending BODY as type: " + body.getClass().getName()); + } Mono result = this.client.post().uri(uri) .headers(headers -> headers(headers, destination, value)).bodyValue(body) - .exchange(); + .exchange() + .doOnNext(response -> { + if (this.debug) { + logger.debug("Response STATUS: " + response.statusCode()); + } + }); if (this.debug) { result = result.log(); } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java index a66f81d70..52e3847d5 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java @@ -52,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; "spring.cloud.function.web.export.sink.url=http://localhost:${my.port}", "spring.cloud.function.web.export.source.url=http://localhost:${my.port}", "spring.cloud.function.web.export.sink.name=origin|uppercase", + "spring.cloud.function.web.export.sink.contentType=text/plain", "spring.cloud.function.web.export.debug=true" }) public class FunctionalExporterTests { diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java index 4b7a9dff1..f7709cbd5 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java @@ -85,8 +85,8 @@ public class FunctionAutoConfigurationIntegrationTests { } // It completed assertThat(this.forwarder.isOk()).isTrue(); - assertThat(this.app.inputs).contains("HELLO"); - assertThat(this.app.inputs).contains("WORLD"); + assertThat(this.app.inputs).contains("\"HELLO\""); + assertThat(this.app.inputs).contains("\"WORLD\""); } @EnableAutoConfiguration