Harmonize AWS and azure adapters

This commit is contained in:
Dave Syer
2018-10-12 17:10:13 +01:00
parent 3966f378b7
commit 083d5e3bf3
3 changed files with 56 additions and 60 deletions

View File

@@ -21,10 +21,9 @@ import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.jar.Manifest;
import org.apache.commons.logging.Log;
@@ -36,10 +35,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
@@ -55,10 +57,6 @@ public class SpringFunctionInitializer implements Closeable {
private Function<Publisher<?>, Publisher<?>> function;
private Consumer<Publisher<?>> consumer;
private Supplier<Publisher<?>> supplier;
private AtomicBoolean initialized = new AtomicBoolean();
@Autowired(required = false)
@@ -94,30 +92,22 @@ public class SpringFunctionInitializer implements Closeable {
ConfigurableApplicationContext context = builder.run();
context.getAutowireCapableBeanFactory().autowireBean(this);
String name = context.getEnvironment().getProperty("function.name");
boolean defaultName = false;
if (name == null) {
name = "function";
defaultName = true;
}
if (this.catalog == null) {
this.function = context.getBean(name, Function.class);
if (context.containsBean(name)) {
this.function = context.getBean(name, Function.class);
}
}
else {
this.function = this.catalog.lookup(Function.class, name);
if (this.function == null) {
if (defaultName) {
name = "consumer";
}
Set<String> functionNames = this.catalog.getNames(Function.class);
if (functionNames.size() == 1) {
this.function = this.catalog.lookup(Function.class,
functionNames.iterator().next());
}
else {
this.function = this.catalog.lookup(Function.class, name);
if (this.function == null) {
this.consumer = this.catalog.lookup(Consumer.class, name);
if (this.consumer == null) {
if (defaultName) {
name = "supplier";
}
this.supplier = this.catalog.lookup(Supplier.class, name);
}
}
}
}
this.context = context;
@@ -125,22 +115,39 @@ public class SpringFunctionInitializer implements Closeable {
}
private SpringApplication springApplication() {
ApplicationContextInitializer<?> initializer = null;
ApplicationContextInitializer<GenericApplicationContext> initializer = null;
Class<?> sourceClass = configurationClass;
if (ApplicationContextInitializer.class.isAssignableFrom(sourceClass)) {
initializer = BeanUtils.instantiateClass(configurationClass, ApplicationContextInitializer.class);
@SuppressWarnings("unchecked")
ApplicationContextInitializer<GenericApplicationContext> instance = BeanUtils
.instantiateClass(configurationClass,
ApplicationContextInitializer.class);
initializer = instance;
sourceClass = Object.class;
}
else if (Function.class.isAssignableFrom(sourceClass)) {
@SuppressWarnings("unchecked")
final Class<Function<?, ?>> type = (Class<Function<?, ?>>) sourceClass;
initializer = context -> {
context.registerBean(FunctionRegistration.class,
() -> new FunctionRegistration<>(
context.getAutowireCapableBeanFactory().createBean(type))
.type(FunctionType.of(type)));
};
sourceClass = Object.class;
}
SpringApplication application;
if (initializer!=null) {
if (initializer != null) {
application = new SpringApplication(sourceClass) {
@Override
protected void load(ApplicationContext context, Object[] sources) {
}
};
application.addInitializers(initializer);
application.setDefaultProperties(Collections.singletonMap("spring.functional.enabled", "true"));
} else {
application.setDefaultProperties(
Collections.singletonMap("spring.functional.enabled", "true"));
}
else {
application = new SpringApplication(sourceClass);
}
application.setWebApplicationType(WebApplicationType.NONE);
@@ -155,21 +162,13 @@ public class SpringFunctionInitializer implements Closeable {
}
protected Object function() {
return this.function != null ? this.function
: (this.consumer != null ? this.consumer : this.supplier);
return this.function;
}
protected Publisher<?> apply(Publisher<?> input) {
if (this.function != null) {
return Flux.from(function.apply(input));
}
if (this.consumer != null) {
this.consumer.accept(input);
return Flux.empty();
}
if (this.supplier != null) {
return this.supplier.get();
}
throw new IllegalStateException("No function defined");
}

View File

@@ -18,7 +18,6 @@ package org.springframework.cloud.function.adapter.aws;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.junit.After;
@@ -62,6 +61,14 @@ public class SpringFunctionInitializerTests {
assertThat(result.blockFirst()).isInstanceOf(Bar.class);
}
@Test
public void functionApp() {
initializer = new SpringFunctionInitializer(FluxFunctionApp.class);
initializer.initialize();
Flux<?> result = Flux.from(initializer.apply(Flux.just(new Foo())));
assertThat(result.blockFirst()).isInstanceOf(Bar.class);
}
@Test
public void functionCatalog() {
initializer = new SpringFunctionInitializer(FunctionConfig.class);
@@ -96,14 +103,6 @@ public class SpringFunctionInitializerTests {
assertThat(result.toStream().collect(Collectors.toList())).isEmpty();
}
@Test
public void supplierCatalog() {
initializer = new SpringFunctionInitializer(SupplierConfig.class);
initializer.initialize();
Flux<?> result = Flux.from(initializer.apply(Flux.empty()));
assertThat(result.blockFirst()).isInstanceOf(Bar.class);
}
@Configuration
protected static class FluxFunctionConfig {
@Bean
@@ -112,6 +111,13 @@ public class SpringFunctionInitializerTests {
}
}
protected static class FluxFunctionApp implements Function<Flux<Foo>, Flux<Bar>> {
@Override
public Flux<Bar> apply(Flux<Foo> flux) {
return flux.map(foo -> new Bar());
}
}
protected static class FunctionRegistrar
implements ApplicationContextInitializer<GenericApplicationContext> {
@@ -147,15 +153,6 @@ public class SpringFunctionInitializerTests {
}
}
@Configuration
@Import(ContextFunctionCatalogAutoConfiguration.class)
protected static class SupplierConfig {
@Bean
public Supplier<Bar> supplier() {
return () -> new Bar();
}
}
@Configuration
@Import(ContextFunctionCatalogAutoConfiguration.class)
protected static class ConsumerConfig {