Add FunctionInspector to deployer so types can be inspected

ALso added a bunch of DEBUG logging because it's hard to debug the
deployer app.

Fixed gh-53
This commit is contained in:
Dave Syer
2017-05-22 10:57:47 +02:00
parent 3606e51d78
commit 6e1f0b9a6e
6 changed files with 87 additions and 11 deletions

View File

@@ -15,7 +15,9 @@
*/
package org.springframework.cloud.function.deployer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.ContextFunctionCatalogAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -25,6 +27,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@ConditionalOnClass(FunctionExtractingFunctionCatalog.class)
@AutoConfigureBefore(ContextFunctionCatalogAutoConfiguration.class)
public class FunctionExtractingAutoConfiguration {
@Bean

View File

@@ -21,12 +21,19 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.deployer.thin.ThinJarAppDeployer;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.util.MethodInvoker;
public class FunctionExtractingFunctionCatalog implements FunctionCatalog {
public class FunctionExtractingFunctionCatalog implements FunctionCatalog, FunctionInspector {
private static Log logger = LogFactory
.getLog(FunctionExtractingFunctionCatalog.class);
private final Set<String> deployed = new HashSet<>();
@@ -43,19 +50,39 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog {
@SuppressWarnings("unchecked")
@Override
public <T> Consumer<T> lookupConsumer(String name) {
return (Consumer<T>) find(name, "lookupConsumer");
return (Consumer<T>) lookup(name, "lookupConsumer");
}
@SuppressWarnings("unchecked")
@Override
public <T, R> Function<T, R> lookupFunction(String name) {
return (Function<T, R>) find(name, "lookupFunction");
return (Function<T, R>) lookup(name, "lookupFunction");
}
@SuppressWarnings("unchecked")
@Override
public <T> Supplier<T> lookupSupplier(String name) {
return (Supplier<T>) find(name, "lookupSupplier");
return (Supplier<T>) lookup(name, "lookupSupplier");
}
@Override
public Class<?> getInputType(String name) {
return (Class<?>) inspect(name, "getInputType");
}
@Override
public Class<?> getOutputType(String name) {
return (Class<?>) inspect(name, "getOutputType");
}
@Override
public Object convert(String name, String value) {
return inspect(name, "convert");
}
@Override
public String getName(Object function) {
return (String) inspect(function, "getName");
}
public String deploy(AppDeploymentRequest request) {
@@ -69,9 +96,23 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog {
deployed.remove(id);
}
private Object find(String name, String method) {
private Object inspect(Object arg, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Inspecting " + method);
}
return invoke(FunctionInspector.class, method, arg);
}
private Object lookup(String name, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Looking up " + name + " with " + method);
}
return invoke(FunctionCatalog.class, method, name);
}
private Object invoke(Class<?> type, String method, Object arg) {
for (String id : deployed) {
Object catalog = deployer.getBean(id, FunctionCatalog.class);
Object catalog = deployer.getBean(id, type);
if (catalog == null) {
continue;
}
@@ -79,7 +120,7 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog {
MethodInvoker invoker = new MethodInvoker();
invoker.setTargetObject(catalog);
invoker.setTargetMethod(method);
invoker.setArguments(new Object[] { name });
invoker.setArguments(new Object[] { arg });
invoker.prepare();
Object result = invoker.invoke();
if (result != null) {

View File

@@ -29,17 +29,17 @@ public class SampleApplication {
@Bean
public Function<Flux<Foo>, Flux<Bar>> uppercase() {
return flux -> flux.map(value -> new Bar(value.uppercase()));
return flux -> flux.log().map(value -> new Bar(value.uppercase()));
}
@Bean
public Supplier<Flux<Bar>> words() {
return () -> Flux.fromArray(new Bar[] { new Bar("foo"), new Bar("bar") });
return () -> Flux.fromArray(new Bar[] { new Bar("foo"), new Bar("bar") }).log();
}
@Bean
public Function<Flux<Foo>, Flux<Bar>> lowercase() {
return flux -> flux.map(value -> new Bar(value.lowercase()));
return flux -> flux.log().map(value -> new Bar(value.lowercase()));
}
public static void main(String[] args) throws Exception {

View File

@@ -20,6 +20,9 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.support.FluxSupplier;
@@ -44,7 +47,9 @@ import reactor.core.publisher.Mono;
*/
@Component
public class FunctionController {
private static Log logger = LogFactory.getLog(FunctionController.class);
private FunctionInspector inspector;
@Value("${debug:${DEBUG:false}}")
@@ -62,11 +67,17 @@ public class FunctionController {
@RequestBody FluxRequest<?> body) {
if (function != null) {
Flux<?> result = (Flux<?>) function.apply(body.flux());
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with function");
}
return ResponseEntity.ok().body(debug ? result.log() : result);
}
if (consumer != null) {
Flux<?> flux = body.flux().cache(); // send a copy back to the caller
consumer.accept(flux);
if (logger.isDebugEnabled()) {
logger.debug("Handled POST with consumer");
}
return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux);
}
throw new IllegalArgumentException("no such function");
@@ -90,6 +101,9 @@ public class FunctionController {
supplier = new FluxSupplier(supplier);
}
Flux<?> result = supplier.get();
if (logger.isDebugEnabled()) {
logger.debug("Handled GET with supplier");
}
return debug ? result.log() : result;
}
@@ -97,6 +111,9 @@ public class FunctionController {
@PathVariable String value) {
Object input = inspector.convert(inspector.getName(function), value);
Mono<?> result = Mono.from(function.apply(Flux.just(input)));
if (logger.isDebugEnabled()) {
logger.debug("Handled GET with function");
}
return debug ? result.log() : result;
}
}

View File

@@ -61,6 +61,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
@Autowired
public FunctionHandlerMapping(FunctionCatalog catalog, FunctionInspector inspector) {
this.functions = catalog;
logger.info("FunctionCatalog: " + catalog + ", FunctionInspector: " + inspector);
setOrder(super.getOrder() - 5);
this.controller = new FunctionController(inspector);
}
@@ -91,11 +92,17 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
}
Object function = findFunctionForGet(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for GET: " + path);
}
request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function);
return handler;
}
function = findFunctionForPost(request, path);
if (function != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found function for POST: " + path);
}
request.setAttribute(FluxHandlerMethodArgumentResolver.HANDLER, function);
return handler;
}

View File

@@ -22,6 +22,8 @@ import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionInspector;
@@ -48,6 +50,9 @@ import reactor.core.publisher.Mono;
*/
public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
private static Log logger = LogFactory
.getLog(FluxReturnValueHandler.class);
private ResponseBodyEmitterReturnValueHandler delegate;
private long timeout = 1000L;
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@@ -126,6 +131,9 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
} else {
mediaType = findMediaType(webRequest);
}
if (logger.isDebugEnabled()) {
logger.debug("Handling return value " + type + " with media type: " + mediaType);
}
delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType,
mavContainer, webRequest);
}