GH-256, Add AWS support for Consumer's and Supplier's

Added support for Consumer and Supplier functions when executing within AWS Lambda.

It appears that this was previously supported but was removed in 083d5e for some reason.

Improve Consumer support for AWS API Gateway

Consumer type functions, by definition, do not return anything; i.e. they do not have a response.  However when executing within a Lambda + API Gateway environment, the Lambda must still return a 200-level status response to signal the API Gateway that the function executed succesfully.   I added the logic for this.

Improve Supplier support for AWS API Gateway

Supplier functions, by defnition, do not take input; in the case of API Gateway calls this type of execution passes a null body, which fails to deserialize and throws an exception.  I added logic that skips the deserialization of the body in the case it is a Supplier.
This commit is contained in:
Semyon Fishman
2019-02-13 13:05:31 -05:00
committed by Oleg Zhurakousky
parent 94106dba48
commit 8c4c1aefde
4 changed files with 130 additions and 18 deletions

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.function.adapter.aws;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -124,4 +125,13 @@ public class SpringBootApiGatewayRequestHandler extends
}
}
@Override
public Object handleRequest(APIGatewayProxyRequestEvent event, Context context) {
Object response = super.handleRequest(event, context);
if (returnsOutput())
return response;
else
return new APIGatewayProxyResponseEvent()
.withStatusCode(HttpStatus.OK.value());
}
}

View File

@@ -44,7 +44,7 @@ public class SpringBootRequestHandler<E, O> extends SpringFunctionInitializer
@Override
public Object handleRequest(E event, Context context) {
initialize();
Object input = convertEvent(event);
Object input = acceptsInput() ? convertEvent(event) : "";
Publisher<?> output = apply(extract(input));
return result(input, output);
}

View File

@@ -20,10 +20,12 @@ import java.io.Closeable;
import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.jar.Manifest;
import org.apache.commons.logging.Log;
@@ -50,6 +52,10 @@ public class SpringFunctionInitializer implements Closeable {
private Function<Publisher<?>, Publisher<?>> function;
private Consumer<Publisher<?>> consumer;
private Supplier<Publisher<?>> supplier;
private AtomicBoolean initialized = new AtomicBoolean();
@Autowired(required = false)
@@ -130,27 +136,83 @@ public class SpringFunctionInitializer implements Closeable {
SpringApplication builder = springApplication();
ConfigurableApplicationContext context = builder.run();
context.getAutowireCapableBeanFactory().autowireBean(this);
String name = context.getEnvironment().getProperty("function.name");
if (name == null) {
name = "function";
}
this.context = context;
if (this.catalog == null) {
if (context.containsBean(name)) {
this.function = context.getBean(name, Function.class);
}
initFunctionConsumerOrSupplierFromContext();
}
else {
Set<String> functionNames = this.catalog.getNames(Function.class);
if (functionNames.size() == 1) {
this.function = this.catalog.lookup(Function.class,
functionNames.iterator().next());
initFunctionConsumerOrSupplierFromCatalog();
}
}
private String resolveName(Class<?> type) {
String functionName = context.getEnvironment().getProperty("function.name");
if (functionName != null)
return functionName;
else if (type.isAssignableFrom(Function.class))
return "function";
else if (type.isAssignableFrom(Consumer.class))
return "consumer";
else if (type.isAssignableFrom(Supplier.class))
return "supplier";
throw new IllegalStateException("Unknown type " + type);
}
private void initFunctionConsumerOrSupplierFromContext() {
String name = resolveName(Function.class);
if (context.containsBean(name) && context.getBean(name) instanceof Function) {
this.function = context.getBean(name, Function.class);
return;
}
name = resolveName(Consumer.class);
if (context.containsBean(name) && context.getBean(name) instanceof Consumer) {
this.consumer = context.getBean(name, Consumer.class);
return;
}
name = resolveName(Supplier.class);
if (context.containsBean(name) && context.getBean(name) instanceof Supplier) {
this.supplier = context.getBean(name, Supplier.class);
return;
}
}
private void initFunctionConsumerOrSupplierFromCatalog() {
String name = resolveName(Function.class);
this.function = this.catalog.lookup(Function.class, name);
if (this.function != null)
return;
name = resolveName(Consumer.class);
this.consumer = this.catalog.lookup(Consumer.class, name);
if (this.consumer != null)
return;
name = resolveName(Supplier.class);
this.supplier = this.catalog.lookup(Supplier.class, name);
if (this.supplier != null)
return;
if (this.catalog.size() == 1) {
Iterator<String> names = this.catalog.getNames(Function.class).iterator();
if (names.hasNext()) {
this.function = this.catalog.lookup(Function.class, names.next());
return;
}
else {
this.function = this.catalog.lookup(Function.class, name);
names = this.catalog.getNames(Consumer.class).iterator();
if (names.hasNext()) {
this.consumer = this.catalog.lookup(Consumer.class, names.next());
return;
}
names = this.catalog.getNames(Supplier.class).iterator();
if (names.hasNext()) {
this.supplier = this.catalog.lookup(Supplier.class, names.next());
return;
}
}
this.context = context;
}
private SpringApplication springApplication() {
@@ -169,13 +231,35 @@ public class SpringFunctionInitializer implements Closeable {
}
protected Object function() {
return this.function;
if (this.function != null)
return this.function;
else if (this.consumer != null)
return this.consumer;
else if (this.supplier != null)
return this.supplier;
else
return null;
}
protected boolean acceptsInput() {
return !this.inspector.getInputType(function()).equals(Void.class);
}
protected boolean returnsOutput() {
return !this.inspector.getOutputType(function()).equals(Void.class);
}
protected Publisher<?> apply(Publisher<?> input) {
if (this.function != null) {
return Flux.from(this.function.apply(input));
}
if (this.consumer != null) {
this.consumer.accept(input);
return Flux.empty();
}
if (this.supplier != null) {
return this.supplier.get();
}
throw new IllegalStateException("No function defined");
}

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.function.adapter.aws;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.After;
@@ -102,6 +103,14 @@ public class SpringFunctionInitializerTests {
assertThat(result.toStream().collect(Collectors.toList())).isEmpty();
}
@Test
public void supplierCatalog() {
initializer = new SpringFunctionInitializer(SupplierConfig.class);
initializer.initialize();
Flux<?> result = Flux.from(initializer.apply(Flux.empty()));
assertThat(result.blockFirst()).isInstanceOf(Bar.class);
}
@Configuration
protected static class FluxFunctionConfig {
@@ -161,6 +170,15 @@ public class SpringFunctionInitializerTests {
}
@Configuration
@Import(ContextFunctionCatalogAutoConfiguration.class)
protected static class SupplierConfig {
@Bean
public Supplier<Bar> supplier() {
return () -> new Bar();
}
}
@Configuration
@Import(ContextFunctionCatalogAutoConfiguration.class)
protected static class ConsumerConfig {