GH-600 Fix logic in Azure adapter to ensure proper handling of sveral functions
This also addresses re-initialization of AC when the second function is invoked Added second function to the azure examples Resolves #600
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2019 the original author or authors.
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -28,6 +28,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -45,7 +46,7 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static AzureSpringBootRequestHandler thisInitializer;
|
||||
|
||||
private String functionName;
|
||||
private static FunctionCatalog functionCatalog;
|
||||
|
||||
private final static ExecutionContextDelegate EXECUTION_CTX_DELEGATE = new ExecutionContextDelegate();
|
||||
|
||||
@@ -82,13 +83,15 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
* since Azure creates a new instance of this handler for each invocation,
|
||||
* see https://github.com/spring-cloud/spring-cloud-function/issues/425
|
||||
*/
|
||||
if (thisInitializer == null || !thisInitializer.functionName.equals(name)) {
|
||||
if (thisInitializer == null /*|| !thisInitializer.functionName.equals(name)*/) {
|
||||
initialize(EXECUTION_CTX_DELEGATE);
|
||||
this.functionName = name;
|
||||
functionCatalog = this.catalog;
|
||||
thisInitializer = this;
|
||||
return (O) thisInitializer.handleRequest(input, context);
|
||||
}
|
||||
else {
|
||||
this.catalog = functionCatalog;
|
||||
thisInitializer.clear(name);
|
||||
Publisher<?> events = input == null ? Mono.empty() : extract(convertEvent(input));
|
||||
if (events instanceof Flux) {
|
||||
events = Flux.from(events).map(v -> this.toMessage(v, context));
|
||||
@@ -109,21 +112,10 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
}
|
||||
}
|
||||
|
||||
private Message<?> toMessage(Object value, ExecutionContext context) {
|
||||
if (value instanceof Message) {
|
||||
return (Message<?>) value;
|
||||
}
|
||||
else {
|
||||
Object payload = value;
|
||||
if (value instanceof HttpRequestMessage) {
|
||||
payload = ((HttpRequestMessage) value).getBody();
|
||||
if (payload == null) {
|
||||
payload = ((HttpRequestMessage) value).getQueryParameters();
|
||||
}
|
||||
}
|
||||
return MessageBuilder.withPayload(payload)
|
||||
.setHeader(AbstractSpringFunctionAdapterInitializer.TARGET_EXECUTION_CTX_NAME, context).build();
|
||||
}
|
||||
public void handleOutput(I input, OutputBinding<O> binding,
|
||||
ExecutionContext context) {
|
||||
O result = handleRequest(input, context);
|
||||
binding.setValue(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -131,12 +123,6 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
return ((ExecutionContext) targetContext).getFunctionName();
|
||||
}
|
||||
|
||||
public void handleOutput(I input, OutputBinding<O> binding,
|
||||
ExecutionContext context) {
|
||||
O result = handleRequest(input, context);
|
||||
binding.setValue(result);
|
||||
}
|
||||
|
||||
protected Object convertEvent(I input) {
|
||||
return input;
|
||||
}
|
||||
@@ -148,7 +134,6 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
return Flux.just(input);
|
||||
}
|
||||
|
||||
|
||||
protected boolean isSingleInput(Function<?, ?> function, Object input) {
|
||||
if (!(input instanceof Collection)) {
|
||||
return true;
|
||||
@@ -171,6 +156,25 @@ public class AzureSpringBootRequestHandler<I, O> extends AbstractSpringFunctionA
|
||||
return ((Collection<?>) output).size() <= 1;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private Message<?> toMessage(Object value, ExecutionContext context) {
|
||||
if (value instanceof Message) {
|
||||
return (Message<?>) value;
|
||||
}
|
||||
else {
|
||||
Object payload = value;
|
||||
if (value instanceof HttpRequestMessage) {
|
||||
payload = ((HttpRequestMessage) value).getBody();
|
||||
if (payload == null) {
|
||||
payload = ((HttpRequestMessage) value).getQueryParameters();
|
||||
}
|
||||
}
|
||||
return MessageBuilder.withPayload(payload)
|
||||
.setHeader(AbstractSpringFunctionAdapterInitializer.TARGET_EXECUTION_CTX_NAME, context).build();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class ExecutionContextDelegate implements ExecutionContext {
|
||||
|
||||
ExecutionContext targetContext;
|
||||
|
||||
Reference in New Issue
Block a user