committed by
Oleg Zhurakousky
parent
bb066b9b3f
commit
ba1094df63
@@ -14,102 +14,105 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.adapter.azure;
|
||||
package org.springframework.cloud.function.adapter.azure;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import com.microsoft.azure.functions.HttpMethod;
|
||||
import com.microsoft.azure.functions.HttpRequestMessage;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Christian Tzolov
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Chris Bono
|
||||
* @since 4.0
|
||||
*
|
||||
*/
|
||||
public class AzureFunctionUtil {
|
||||
|
||||
public static String EXECUTION_CONTEXT = "executionContext";
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public static <I> Object enhanceInputIfNecessary(Object input, ExecutionContext executionContext) {
|
||||
if (input == null) { // Supplier
|
||||
return input;
|
||||
}
|
||||
if (input instanceof Publisher) {
|
||||
return Flux.from((Publisher) input).map(item -> {
|
||||
if (item instanceof Message) {
|
||||
return MessageBuilder.fromMessage((Message<I>) item)
|
||||
.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
else {
|
||||
return constructInputMessageFromItem(input, executionContext);
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (input instanceof Message) {
|
||||
return MessageBuilder.fromMessage((Message<I>) input)
|
||||
.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
else if (input instanceof Iterable) {
|
||||
return Flux.fromIterable((Iterable) input).map(item -> {
|
||||
return constructInputMessageFromItem(item, executionContext);
|
||||
});
|
||||
}
|
||||
return constructInputMessageFromItem(input, executionContext);
|
||||
}
|
||||
|
||||
private static <I> Message<?> constructInputMessageFromItem(Object input, ExecutionContext executionContext) {
|
||||
MessageBuilder<?> messageBuilder = null;
|
||||
if (input instanceof HttpRequestMessage) {
|
||||
HttpRequestMessage<I> requestMessage = (HttpRequestMessage<I>) input;
|
||||
Object payload = requestMessage.getHttpMethod() != null
|
||||
&& requestMessage.getHttpMethod().equals(HttpMethod.GET)
|
||||
? requestMessage.getQueryParameters()
|
||||
: requestMessage.getBody();
|
||||
|
||||
if (payload == null) {
|
||||
payload = Optional.empty();
|
||||
}
|
||||
messageBuilder = MessageBuilder.withPayload(payload).copyHeaders(getHeaders(requestMessage));
|
||||
}
|
||||
else {
|
||||
messageBuilder = MessageBuilder.withPayload(input);
|
||||
}
|
||||
return messageBuilder.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
|
||||
private static <I> MessageHeaders getHeaders(HttpRequestMessage<I> event) {
|
||||
Map<String, Object> headers = new HashMap<String, Object>();
|
||||
|
||||
if (event.getHeaders() != null) {
|
||||
headers.putAll(event.getHeaders());
|
||||
}
|
||||
if (event.getQueryParameters() != null) {
|
||||
headers.putAll(event.getQueryParameters());
|
||||
}
|
||||
if (event.getUri() != null) {
|
||||
headers.put("path", event.getUri().getPath());
|
||||
}
|
||||
|
||||
if (event.getHttpMethod() != null) {
|
||||
headers.put("httpMethod", event.getHttpMethod().toString());
|
||||
}
|
||||
|
||||
headers.put("request", event.getBody());
|
||||
return new MessageHeaders(headers);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import com.microsoft.azure.functions.HttpMethod;
|
||||
import com.microsoft.azure.functions.HttpRequestMessage;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
/**
|
||||
* @author Christian Tzolov
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Chris Bono
|
||||
* @since 4.0
|
||||
*/
|
||||
public final class AzureFunctionUtil {
|
||||
|
||||
/**
|
||||
* Message header key name used to store and extract the ExecutionContext.
|
||||
*/
|
||||
public static String EXECUTION_CONTEXT = "executionContext";
|
||||
|
||||
private AzureFunctionUtil() {
|
||||
};
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public static <I> Object enhanceInputIfNecessary(Object input, ExecutionContext executionContext) {
|
||||
if (input == null) { // Supplier
|
||||
return input;
|
||||
}
|
||||
if (input instanceof Publisher) {
|
||||
return Flux.from((Publisher) input).map(item -> {
|
||||
if (item instanceof Message) {
|
||||
return MessageBuilder.fromMessage((Message<I>) item)
|
||||
.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
else {
|
||||
return constructInputMessageFromItem(input, executionContext);
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (input instanceof Message) {
|
||||
return MessageBuilder.fromMessage((Message<I>) input)
|
||||
.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
else if (input instanceof Iterable) {
|
||||
return Flux.fromIterable((Iterable) input).map(item -> {
|
||||
return constructInputMessageFromItem(item, executionContext);
|
||||
});
|
||||
}
|
||||
return constructInputMessageFromItem(input, executionContext);
|
||||
}
|
||||
|
||||
private static <I> Message<?> constructInputMessageFromItem(Object input, ExecutionContext executionContext) {
|
||||
MessageBuilder<?> messageBuilder = null;
|
||||
if (input instanceof HttpRequestMessage) {
|
||||
HttpRequestMessage<I> requestMessage = (HttpRequestMessage<I>) input;
|
||||
Object payload = requestMessage.getHttpMethod() != null
|
||||
&& requestMessage.getHttpMethod().equals(HttpMethod.GET)
|
||||
? requestMessage.getQueryParameters()
|
||||
: requestMessage.getBody();
|
||||
|
||||
if (payload == null) {
|
||||
payload = Optional.empty();
|
||||
}
|
||||
messageBuilder = MessageBuilder.withPayload(payload).copyHeaders(getHeaders(requestMessage));
|
||||
}
|
||||
else {
|
||||
messageBuilder = MessageBuilder.withPayload(input);
|
||||
}
|
||||
return messageBuilder.setHeaderIfAbsent(EXECUTION_CONTEXT, executionContext).build();
|
||||
}
|
||||
|
||||
private static <I> MessageHeaders getHeaders(HttpRequestMessage<I> event) {
|
||||
Map<String, Object> headers = new HashMap<String, Object>();
|
||||
|
||||
if (event.getHeaders() != null) {
|
||||
headers.putAll(event.getHeaders());
|
||||
}
|
||||
if (event.getQueryParameters() != null) {
|
||||
headers.putAll(event.getQueryParameters());
|
||||
}
|
||||
if (event.getUri() != null) {
|
||||
headers.put("path", event.getUri().getPath());
|
||||
}
|
||||
|
||||
if (event.getHttpMethod() != null) {
|
||||
headers.put("httpMethod", event.getHttpMethod().toString());
|
||||
}
|
||||
|
||||
headers.put("request", event.getBody());
|
||||
return new MessageHeaders(headers);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,144 +14,143 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.adapter.azure.injector;
|
||||
package org.springframework.cloud.function.adapter.azure.injector;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import com.microsoft.azure.functions.HttpMethod;
|
||||
import com.microsoft.azure.functions.HttpRequestMessage;
|
||||
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
|
||||
import com.microsoft.azure.functions.annotation.FunctionName;
|
||||
import com.microsoft.azure.functions.annotation.HttpTrigger;
|
||||
import com.microsoft.azure.functions.spi.inject.FunctionInstanceInjector;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurationExcludeFilter;
|
||||
import org.springframework.boot.context.TypeExcludeFilter;
|
||||
import org.springframework.cloud.function.adapter.azure.AzureFunctionInstanceInjector;
|
||||
import org.springframework.cloud.function.adapter.azure.AzureFunctionUtil;
|
||||
import org.springframework.cloud.function.adapter.azure.HttpFunctionInvokerTests;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.ComponentScan.Filter;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.FilterType;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* @author Christian Tzolov
|
||||
*/
|
||||
public class AzureFunctionInstanceInjectorTest {
|
||||
|
||||
static ExecutionContext executionContext = new ExecutionContext() {
|
||||
@Override
|
||||
public Logger getLogger() {
|
||||
return Logger.getLogger(AzureFunctionInstanceInjectorTest.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInvocationId() {
|
||||
return "id1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFunctionName() {
|
||||
return "hello";
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testFunctionInjector() throws Exception {
|
||||
|
||||
FunctionInstanceInjector injector = initializeFunctionInstanceInjector();
|
||||
Assertions.assertThat(injector).isNotNull();
|
||||
Assertions.assertThat(injector).isInstanceOf(AzureFunctionInstanceInjector.class);
|
||||
|
||||
System.setProperty("MAIN_CLASS", MyMainConfig.class.getName());
|
||||
|
||||
MyAzureTestFunction functionInstance = injector.getInstance(MyAzureTestFunction.class);
|
||||
|
||||
HttpFunctionInvokerTests.HttpRequestMessageStub<Optional<String>> request = new HttpFunctionInvokerTests.HttpRequestMessageStub<Optional<String>>();
|
||||
|
||||
request.setBody(Optional.of("test"));
|
||||
|
||||
String result = functionInstance.execute(request, executionContext);
|
||||
|
||||
Assertions.assertThat(result).isEqualTo("TEST");
|
||||
|
||||
Assertions.assertThat(functionInstance).isNotNull();
|
||||
Assertions.assertThat(functionInstance).isInstanceOf(MyAzureTestFunction.class);
|
||||
}
|
||||
|
||||
private static FunctionInstanceInjector initializeFunctionInstanceInjector() {
|
||||
FunctionInstanceInjector functionInstanceInjector = null;
|
||||
ClassLoader prevContextClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Iterator<FunctionInstanceInjector> iterator = ServiceLoader.load(FunctionInstanceInjector.class).iterator();
|
||||
if (iterator.hasNext()) {
|
||||
functionInstanceInjector = iterator.next();
|
||||
if (iterator.hasNext()) {
|
||||
throw new RuntimeException(
|
||||
"Customer function app has multiple FunctionInstanceInjector implementations");
|
||||
}
|
||||
}
|
||||
else {
|
||||
functionInstanceInjector = new FunctionInstanceInjector() {
|
||||
@Override
|
||||
public <T> T getInstance(Class<T> functionClass) throws Exception {
|
||||
return functionClass.getDeclaredConstructor().newInstance();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(prevContextClassLoader);
|
||||
}
|
||||
return functionInstanceInjector;
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
|
||||
@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
|
||||
public static class MyMainConfig {
|
||||
|
||||
@Bean
|
||||
public Function<Message<String>, String> uppercase() {
|
||||
return message -> {
|
||||
ExecutionContext context = (ExecutionContext) message.getHeaders()
|
||||
.get(AzureFunctionUtil.EXECUTION_CONTEXT);
|
||||
Assertions.assertThat(context).isNotNull();
|
||||
Assertions.assertThat(context.getFunctionName()).isEqualTo("hello");
|
||||
return message.getPayload().toUpperCase();
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
public static class MyAzureTestFunction {
|
||||
|
||||
@Autowired
|
||||
private Function<Message<String>, String> uppercase;
|
||||
|
||||
@FunctionName("ditest")
|
||||
public String execute(
|
||||
@HttpTrigger(name = "req", methods = { HttpMethod.GET,
|
||||
HttpMethod.POST }, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
|
||||
ExecutionContext context) {
|
||||
|
||||
Message<String> enhancedRequest = (Message<String>) AzureFunctionUtil.enhanceInputIfNecessary(
|
||||
request.getBody().get(),
|
||||
context);
|
||||
return uppercase.apply(enhancedRequest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import com.microsoft.azure.functions.HttpMethod;
|
||||
import com.microsoft.azure.functions.HttpRequestMessage;
|
||||
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
|
||||
import com.microsoft.azure.functions.annotation.FunctionName;
|
||||
import com.microsoft.azure.functions.annotation.HttpTrigger;
|
||||
import com.microsoft.azure.functions.spi.inject.FunctionInstanceInjector;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurationExcludeFilter;
|
||||
import org.springframework.boot.context.TypeExcludeFilter;
|
||||
import org.springframework.cloud.function.adapter.azure.AzureFunctionInstanceInjector;
|
||||
import org.springframework.cloud.function.adapter.azure.AzureFunctionUtil;
|
||||
import org.springframework.cloud.function.adapter.azure.HttpFunctionInvokerTests;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.ComponentScan.Filter;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.FilterType;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* @author Christian Tzolov
|
||||
*/
|
||||
public class AzureFunctionInstanceInjectorTest {
|
||||
|
||||
static ExecutionContext executionContext = new ExecutionContext() {
|
||||
@Override
|
||||
public Logger getLogger() {
|
||||
return Logger.getLogger(AzureFunctionInstanceInjectorTest.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInvocationId() {
|
||||
return "id1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFunctionName() {
|
||||
return "hello";
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testFunctionInjector() throws Exception {
|
||||
|
||||
FunctionInstanceInjector injector = initializeFunctionInstanceInjector();
|
||||
Assertions.assertThat(injector).isNotNull();
|
||||
Assertions.assertThat(injector).isInstanceOf(AzureFunctionInstanceInjector.class);
|
||||
|
||||
System.setProperty("MAIN_CLASS", MyMainConfig.class.getName());
|
||||
|
||||
MyAzureTestFunction functionInstance = injector.getInstance(MyAzureTestFunction.class);
|
||||
|
||||
HttpFunctionInvokerTests.HttpRequestMessageStub<Optional<String>> request = new HttpFunctionInvokerTests.HttpRequestMessageStub<Optional<String>>();
|
||||
|
||||
request.setBody(Optional.of("test"));
|
||||
|
||||
String result = functionInstance.execute(request, executionContext);
|
||||
|
||||
Assertions.assertThat(result).isEqualTo("TEST");
|
||||
|
||||
Assertions.assertThat(functionInstance).isNotNull();
|
||||
Assertions.assertThat(functionInstance).isInstanceOf(MyAzureTestFunction.class);
|
||||
}
|
||||
|
||||
private static FunctionInstanceInjector initializeFunctionInstanceInjector() {
|
||||
FunctionInstanceInjector functionInstanceInjector = null;
|
||||
ClassLoader prevContextClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Iterator<FunctionInstanceInjector> iterator = ServiceLoader.load(FunctionInstanceInjector.class).iterator();
|
||||
if (iterator.hasNext()) {
|
||||
functionInstanceInjector = iterator.next();
|
||||
if (iterator.hasNext()) {
|
||||
throw new RuntimeException(
|
||||
"Customer function app has multiple FunctionInstanceInjector implementations");
|
||||
}
|
||||
}
|
||||
else {
|
||||
functionInstanceInjector = new FunctionInstanceInjector() {
|
||||
@Override
|
||||
public <T> T getInstance(Class<T> functionClass) throws Exception {
|
||||
return functionClass.getDeclaredConstructor().newInstance();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(prevContextClassLoader);
|
||||
}
|
||||
return functionInstanceInjector;
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
|
||||
@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
|
||||
public static class MyMainConfig {
|
||||
|
||||
@Bean
|
||||
public Function<Message<String>, String> uppercase() {
|
||||
return message -> {
|
||||
ExecutionContext context = (ExecutionContext) message.getHeaders()
|
||||
.get(AzureFunctionUtil.EXECUTION_CONTEXT);
|
||||
Assertions.assertThat(context).isNotNull();
|
||||
Assertions.assertThat(context.getFunctionName()).isEqualTo("hello");
|
||||
return message.getPayload().toUpperCase();
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
public static class MyAzureTestFunction {
|
||||
|
||||
@Autowired
|
||||
private Function<Message<String>, String> uppercase;
|
||||
|
||||
@FunctionName("ditest")
|
||||
public String execute(
|
||||
@HttpTrigger(name = "req", methods = { HttpMethod.GET,
|
||||
HttpMethod.POST }, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
|
||||
ExecutionContext context) {
|
||||
|
||||
Message<String> enhancedRequest = (Message<String>) AzureFunctionUtil.enhanceInputIfNecessary(
|
||||
request.getBody().get(),
|
||||
context);
|
||||
return uppercase.apply(enhancedRequest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user