Polished adapter initializers consolidation effort

- Added ability to retrieve input type from FunctionRegistration (if available) in AbstractSpringFunctionAdapterInitializer
- Removed azure/AzureSpringFunctionInitializer and aws/SpringFunctionInitializer
- Added additional tests in AWS and Azure modules
- See 0189c578ef for additional info
This commit is contained in:
Oleg Zhurakousky
2019-02-25 19:37:34 +01:00
parent 41f66e9b48
commit cdca44f714
11 changed files with 102 additions and 558 deletions

View File

@@ -26,7 +26,6 @@ import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
@@ -37,6 +36,7 @@ import static java.util.stream.Collectors.toList;
* @param <O> response type
* @author Mark Fisher
* @author Halvdan Hoem Grelland
* @author Oleg Zhurakousky
*/
public class SpringBootKinesisEventHandler<E, O>
extends SpringBootRequestHandler<KinesisEvent, O> {
@@ -44,9 +44,6 @@ public class SpringBootKinesisEventHandler<E, O>
@Autowired
private ObjectMapper mapper;
@Autowired
private FunctionInspector inspector;
public SpringBootKinesisEventHandler() {
super();
}
@@ -65,7 +62,7 @@ public class SpringBootKinesisEventHandler<E, O>
protected Object convertEvent(KinesisEvent event) {
List<E> payloads = deserializePayloads(event.getRecords());
if (functionAcceptsMessage()) {
if (getInspector().isMessage(function())) {
return wrapInMessages(payloads);
}
else {
@@ -92,9 +89,4 @@ public class SpringBootKinesisEventHandler<E, O>
throw new IllegalStateException("Cannot convert event", e);
}
}
private boolean functionAcceptsMessage() {
return this.inspector.isMessage(function());
}
}

View File

@@ -25,12 +25,15 @@ import com.amazonaws.services.lambda.runtime.RequestHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer;
/**
* @param <E> event type
* @param <O> result types
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
public class SpringBootRequestHandler<E, O> extends SpringFunctionInitializer
public class SpringBootRequestHandler<E, O> extends AbstractSpringFunctionAdapterInitializer<Context>
implements RequestHandler<E, Object> {
public SpringBootRequestHandler(Class<?> configurationClass) {
@@ -43,21 +46,31 @@ public class SpringBootRequestHandler<E, O> extends SpringFunctionInitializer
@Override
public Object handleRequest(E event, Context context) {
initialize();
initialize(context);
Object input = acceptsInput() ? convertEvent(event) : "";
Publisher<?> output = apply(extract(input));
return result(input, output);
}
private Object result(Object input, Publisher<?> output) {
@SuppressWarnings("unchecked")
@Override
protected <T> T result(Object input, Publisher<?> output) {
List<O> result = new ArrayList<>();
for (Object value : Flux.from(output).toIterable()) {
result.add(convertOutput(value));
}
if (isSingleValue(input) && result.size() == 1) {
return result.get(0);
return (T) result.get(0);
}
return result;
return (T) result;
}
protected boolean acceptsInput() {
return !this.getInspector().getInputType(function()).equals(Void.class);
}
protected boolean returnsOutput() {
return !this.getInspector().getOutputType(function()).equals(Void.class);
}
private boolean isSingleValue(Object input) {

View File

@@ -49,14 +49,6 @@ public class SpringBootStreamHandler extends AbstractSpringFunctionAdapterInitia
super(configurationClass);
}
@Override
protected void initialize(Context context) {
super.initialize(context);
if (this.mapper == null) {
this.mapper = new ObjectMapper();
}
}
@Override
public void handleRequest(InputStream input, OutputStream output, Context context)
throws IOException {
@@ -66,6 +58,15 @@ public class SpringBootStreamHandler extends AbstractSpringFunctionAdapterInitia
this.mapper.writeValue(output, result(value, flux));
}
@Override
protected void initialize(Context context) {
super.initialize(context);
if (this.mapper == null) {
this.mapper = new ObjectMapper();
}
}
private Flux<?> extract(Object input) {
if (input instanceof Collection) {
return Flux.fromIterable((Iterable<?>) input);

View File

@@ -1,280 +0,0 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.adapter.aws;
import java.io.Closeable;
import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.jar.Manifest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.ClassUtils;
/**
* @author Dave Syer
* @author Semyon Fishman
* @deprecated as of 2.1 in favor of {@link AbstractSpringFunctionAdapterInitializer}.
* It is no longer used by the framework and only exists for avoiding potential regressions.
*/
@Deprecated
public class SpringFunctionInitializer implements Closeable {
private static Log logger = LogFactory.getLog(SpringFunctionInitializer.class);
private final Class<?> configurationClass;
private Function<Publisher<?>, Publisher<?>> function;
private Consumer<Publisher<?>> consumer;
private Supplier<Publisher<?>> supplier;
private AtomicBoolean initialized = new AtomicBoolean();
@Autowired(required = false)
private FunctionInspector inspector;
@Autowired(required = false)
private FunctionCatalog catalog;
private ConfigurableApplicationContext context;
public SpringFunctionInitializer(Class<?> configurationClass) {
this.configurationClass = configurationClass;
}
public SpringFunctionInitializer() {
this(getStartClass());
}
private static Class<?> getStartClass() {
ClassLoader classLoader = SpringFunctionInitializer.class.getClassLoader();
if (System.getenv("MAIN_CLASS") != null) {
return ClassUtils.resolveClassName(System.getenv("MAIN_CLASS"), classLoader);
}
try {
Class<?> result = getStartClass(
Collections.list(classLoader.getResources("META-INF/MANIFEST.MF")));
if (result == null) {
result = getStartClass(Collections
.list(classLoader.getResources("meta-inf/manifest.mf")));
}
logger.info("Main class: " + result);
return result;
}
catch (Exception ex) {
logger.error("Failed to find main class", ex);
return null;
}
}
private static Class<?> getStartClass(List<URL> list) {
logger.info("Searching manifests: " + list);
for (URL url : list) {
try {
logger.info("Searching manifest: " + url);
InputStream inputStream = url.openStream();
try {
Manifest manifest = new Manifest(inputStream);
String startClass = manifest.getMainAttributes()
.getValue("Start-Class");
if (startClass != null) {
return ClassUtils.forName(startClass,
SpringFunctionInitializer.class.getClassLoader());
}
}
finally {
inputStream.close();
}
}
catch (Exception ex) {
}
}
return null;
}
@Override
public void close() {
if (this.context != null) {
this.context.close();
}
}
protected void initialize() {
if (!this.initialized.compareAndSet(false, true)) {
return;
}
logger.info("Initializing: " + this.configurationClass);
SpringApplication builder = springApplication();
ConfigurableApplicationContext context = builder.run();
context.getAutowireCapableBeanFactory().autowireBean(this);
this.context = context;
if (this.catalog == null) {
initFunctionConsumerOrSupplierFromContext();
}
else {
initFunctionConsumerOrSupplierFromCatalog();
}
}
private String resolveName(Class<?> type) {
String functionName = context.getEnvironment().getProperty("function.name");
if (functionName != null) {
return functionName;
}
else if (type.isAssignableFrom(Function.class)) {
return "function";
}
else if (type.isAssignableFrom(Consumer.class)) {
return "consumer";
}
else if (type.isAssignableFrom(Supplier.class)) {
return "supplier";
}
throw new IllegalStateException("Unknown type " + type);
}
@SuppressWarnings("unchecked")
private void initFunctionConsumerOrSupplierFromContext() {
String name = resolveName(Function.class);
if (context.containsBean(name) && context.getBean(name) instanceof Function) {
this.function = context.getBean(name, Function.class);
return;
}
name = resolveName(Consumer.class);
if (context.containsBean(name) && context.getBean(name) instanceof Consumer) {
this.consumer = context.getBean(name, Consumer.class);
return;
}
name = resolveName(Supplier.class);
if (context.containsBean(name) && context.getBean(name) instanceof Supplier) {
this.supplier = context.getBean(name, Supplier.class);
return;
}
}
private void initFunctionConsumerOrSupplierFromCatalog() {
String name = resolveName(Function.class);
this.function = this.catalog.lookup(Function.class, name);
if (this.function != null) {
return;
}
name = resolveName(Consumer.class);
this.consumer = this.catalog.lookup(Consumer.class, name);
if (this.consumer != null) {
return;
}
name = resolveName(Supplier.class);
this.supplier = this.catalog.lookup(Supplier.class, name);
if (this.supplier != null) {
return;
}
if (this.catalog.size() == 1) {
Iterator<String> names = this.catalog.getNames(Function.class).iterator();
if (names.hasNext()) {
this.function = this.catalog.lookup(Function.class, names.next());
return;
}
names = this.catalog.getNames(Consumer.class).iterator();
if (names.hasNext()) {
this.consumer = this.catalog.lookup(Consumer.class, names.next());
return;
}
names = this.catalog.getNames(Supplier.class).iterator();
if (names.hasNext()) {
this.supplier = this.catalog.lookup(Supplier.class, names.next());
return;
}
}
}
private SpringApplication springApplication() {
Class<?> sourceClass = this.configurationClass;
SpringApplication application = new org.springframework.cloud.function.context.FunctionalSpringApplication(
sourceClass);
application.setWebApplicationType(WebApplicationType.NONE);
return application;
}
protected Class<?> getInputType() {
if (this.inspector != null) {
return this.inspector.getInputType(function());
}
return Object.class;
}
protected Object function() {
if (this.function != null) {
return this.function;
}
else if (this.consumer != null) {
return this.consumer;
}
else if (this.supplier != null) {
return this.supplier;
}
return null;
}
protected boolean acceptsInput() {
return !this.inspector.getInputType(function()).equals(Void.class);
}
protected boolean returnsOutput() {
return !this.inspector.getOutputType(function()).equals(Void.class);
}
protected Publisher<?> apply(Publisher<?> input) {
if (this.function != null) {
return Flux.from(this.function.apply(input));
}
if (this.consumer != null) {
this.consumer.accept(input);
return Flux.empty();
}
if (this.supplier != null) {
return this.supplier.get();
}
throw new IllegalStateException("No function defined");
}
}

View File

@@ -34,6 +34,9 @@ import org.springframework.messaging.support.GenericMessage;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
*/
public class SpringBootApiGatewayRequestHandlerTests {
private SpringBootApiGatewayRequestHandler handler;
@@ -41,8 +44,6 @@ public class SpringBootApiGatewayRequestHandlerTests {
@Test
public void functionBean() {
this.handler = new SpringBootApiGatewayRequestHandler(FunctionConfig.class);
this.handler.initialize();
APIGatewayProxyRequestEvent request = new APIGatewayProxyRequestEvent();
request.setBody("{\"value\":\"foo\"}");
@@ -58,8 +59,6 @@ public class SpringBootApiGatewayRequestHandlerTests {
public void functionMessageBean() {
this.handler = new SpringBootApiGatewayRequestHandler(
FunctionMessageConfig.class);
this.handler.initialize();
APIGatewayProxyRequestEvent request = new APIGatewayProxyRequestEvent();
request.setBody("{\"value\":\"foo\"}");

View File

@@ -111,7 +111,6 @@ public class SpringBootKinesisEventHandlerTests {
@Test
public void functionBeanHandlesKinesisEvent() throws Exception {
this.handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class);
this.handler.initialize();
KinesisEvent event = asKinesisEvent(singletonList(new Foo("foo")));
@@ -123,7 +122,6 @@ public class SpringBootKinesisEventHandlerTests {
@Test
public void functionBeanHandlesAggregatedKinesisEvent() throws Exception {
this.handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class);
this.handler.initialize();
List<Foo> events = asList(new Foo("foo"), new Foo("bar"), new Foo("baz"));
KinesisEvent aggregatedEvent = asAggregatedKinesisEvent(events);
@@ -137,7 +135,6 @@ public class SpringBootKinesisEventHandlerTests {
@Test
public void functionMessageBean() throws Exception {
this.handler = new SpringBootKinesisEventHandler<>(FunctionMessageConfig.class);
this.handler.initialize();
KinesisEvent event = asKinesisEvent(asList(new Foo("foo"), new Foo("bar")));

View File

@@ -39,7 +39,6 @@ public class SpringBootRequestHandlerTests {
@Test
public void functionBean() throws Exception {
this.handler = new SpringBootRequestHandler<Foo, Bar>(FunctionConfig.class);
this.handler.initialize();
Object output = this.handler.handleRequest(new Foo("foo"), null);
assertThat(output).isInstanceOf(Bar.class);
}

View File

@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.util.function.Function;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
@@ -58,6 +59,46 @@ public class SpringBootStreamHandlerTests {
assertThat(output.toString()).isEqualTo("{\"value\":\"FOO\"}");
}
@Test
public void functionNonFluxBeanNoCatalog() throws Exception {
this.handler = new SpringBootStreamHandler(NoCatalogNonFluxFunctionConfig.class);
this.handler.initialize(null);
ByteArrayOutputStream output = new ByteArrayOutputStream();
this.handler.handleRequest(
new ByteArrayInputStream("{\"value\":\"foo\"}".getBytes()), output, null);
assertThat(output.toString()).isEqualTo("{\"value\":\"FOO\"}");
}
@Test
public void functionFluxBeanNoCatalog() throws Exception {
this.handler = new SpringBootStreamHandler(NoCatalogFluxFunctionConfig.class);
this.handler.initialize(null);
ByteArrayOutputStream output = new ByteArrayOutputStream();
this.handler.handleRequest(
new ByteArrayInputStream("{\"value\":\"foo\"}".getBytes()), output, null);
assertThat(output.toString()).isEqualTo("{\"value\":\"FOO\"}");
}
@Configuration
protected static class NoCatalogNonFluxFunctionConfig {
@Bean
public Function<Foo, Bar> function() {
return foo -> new Bar(foo.getValue().toUpperCase());
}
}
@Configuration
protected static class NoCatalogFluxFunctionConfig {
@Bean
public Function<Flux<Foo>, Flux<Bar>> function() {
return flux -> flux.map(foo -> new Bar(foo.getValue().toUpperCase()));
}
}
@Configuration
@Import({ ContextFunctionCatalogAutoConfiguration.class,
JacksonAutoConfiguration.class })