GH-606 add initila support for generating default cloud event attributes
This commit is contained in:
@@ -16,6 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
/**
|
||||
@@ -57,4 +61,12 @@ public interface CloudEventAtttributesProvider {
|
||||
* @return modifiable instance of {@link CloudEventAttributes}
|
||||
*/
|
||||
RequiredAttributeAccessor get(MessageHeaders headers);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param inputMessage input message used to invoke user functionality (e.g., function)
|
||||
* @param result result of the invocation of user functionality (e.g., function)
|
||||
* @return instance of {@link CloudEventAttributes}
|
||||
*/
|
||||
Map<String, Object> generateDefaultCloudEventHeaders(Message<?> inputMessage, Object result);
|
||||
}
|
||||
|
||||
@@ -16,12 +16,20 @@
|
||||
|
||||
package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -29,10 +37,9 @@ import org.springframework.util.Assert;
|
||||
* @since 3.1
|
||||
*
|
||||
*/
|
||||
public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider {
|
||||
/*
|
||||
* should i provide instance() method for convinience or should it be always injected into function
|
||||
*/
|
||||
public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider, ApplicationContextAware {
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
|
||||
@@ -61,4 +68,26 @@ public class DefaultCloudEventAttributesProvider implements CloudEventAtttribute
|
||||
return new RequiredAttributeAccessor(headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> generateDefaultCloudEventHeaders(Message<?> inputMessage, Object result) {
|
||||
if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event
|
||||
String applicationName = this.getApplicationName();
|
||||
return this.get(inputMessage.getHeaders())
|
||||
.setId(UUID.randomUUID().toString())
|
||||
.setType(result.getClass().getName())
|
||||
.setSource(applicationName);
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
|
||||
}
|
||||
|
||||
private String getApplicationName() {
|
||||
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
|
||||
String name = environment.getProperty("spring.application.name");
|
||||
return "http://spring.io/" + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -30,7 +31,9 @@ import org.aopalliance.intercept.MethodInvocation;
|
||||
import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistration;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
@@ -39,7 +42,9 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.core.convert.ConversionService;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -51,6 +56,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
||||
|
||||
private GenericApplicationContext applicationContext;
|
||||
|
||||
@Autowired(required = false)
|
||||
private CloudEventAtttributesProvider cloudEventAtttributesProvider;
|
||||
|
||||
|
||||
public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) {
|
||||
super(conversionService, messageConverter, jsonMapper);
|
||||
@@ -150,9 +158,26 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
||||
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
|
||||
}
|
||||
|
||||
if (function != null && this.cloudEventAtttributesProvider != null) {
|
||||
BiFunction<Message<?>, Object, Message<?>> invocationResultHeaderEnricher = new BiFunction<Message<?>, Object, Message<?>>() {
|
||||
|
||||
@Override
|
||||
public Message<?> apply(Message<?> inputMessage, Object invocationResult) {
|
||||
Message message = MessageBuilder.withPayload(invocationResult).copyHeaders(
|
||||
cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(inputMessage, invocationResult))
|
||||
.build();
|
||||
|
||||
return message;
|
||||
}
|
||||
};
|
||||
function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher);
|
||||
}
|
||||
|
||||
return (T) function;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private Object discoverFunctionInBeanFactory(String functionName) {
|
||||
Object functionCandidate = null;
|
||||
if (this.applicationContext.containsBean(functionName)) {
|
||||
|
||||
@@ -32,6 +32,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -320,6 +321,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
*/
|
||||
private Function<Object, Message> enhancer;
|
||||
|
||||
private BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher;
|
||||
|
||||
void setOutputMessageHeaderEnricher(BiFunction<Message<?>, Object, Message<?>> outputMessageHeaderEnricher) {
|
||||
this.outputMessageHeaderEnricher = outputMessageHeaderEnricher;
|
||||
}
|
||||
|
||||
FunctionInvocationWrapper(FunctionInvocationWrapper function) {
|
||||
this.target = function.target;
|
||||
this.inputType = function.inputType;
|
||||
@@ -615,7 +622,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
|
||||
}
|
||||
else {
|
||||
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
|
||||
if (this.outputMessageHeaderEnricher != null) {
|
||||
result = this.outputMessageHeaderEnricher.apply((Message<?>) input, result);
|
||||
}
|
||||
else {
|
||||
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
@@ -75,6 +75,7 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public CloudEventAtttributesProvider cloudEventAttributesProvider() {
|
||||
return new DefaultCloudEventAttributesProvider();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user