Polishing cleanup and javadoc
This commit is contained in:
@@ -202,6 +202,17 @@ public final class CloudEventMessageUtils {
|
||||
return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}.
|
||||
* This typically happens when 'inputMessage' represents Cloud Event in structured-mode.
|
||||
* <br>
|
||||
* In the event the message already represents Cloud Event in binary-mode, or this
|
||||
* message does not represent Cloud Event at all, it will be returned unchanged.
|
||||
*
|
||||
* @param inputMessage instance of incoming {@link Message}
|
||||
* @param messageConverter instance of {@link MessageConverter} to assist with type conversion.
|
||||
* @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Message<?> toBinary(Message<?> inputMessage, MessageConverter messageConverter) {
|
||||
|
||||
@@ -219,6 +230,7 @@ public final class CloudEventMessageUtils {
|
||||
: MimeTypeUtils.APPLICATION_JSON_VALUE;
|
||||
|
||||
String suffix = contentType.getSubtypeSuffix();
|
||||
Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType);
|
||||
MimeType cloudEventDeserializationContentType = MimeTypeUtils
|
||||
.parseMimeType(contentType.getType() + "/" + suffix);
|
||||
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
|
||||
@@ -238,6 +250,13 @@ public final class CloudEventMessageUtils {
|
||||
return inputMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc)
|
||||
* and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc).
|
||||
*
|
||||
* @param messageHeaders instance of {@link MessageHeaders}
|
||||
* @return prefix to be used for Cloud Events attributes
|
||||
*/
|
||||
public static String determinePrefixToUse(MessageHeaders messageHeaders) {
|
||||
Set<String> keys = messageHeaders.keySet();
|
||||
if (keys.contains("user-agent")) {
|
||||
|
||||
@@ -47,6 +47,7 @@ import reactor.util.function.Tuples;
|
||||
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
@@ -910,13 +911,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
* case that requires it since it may contain forwarding url
|
||||
*/
|
||||
private boolean containsRetainMessageSignalInHeaders(Message message) {
|
||||
for (String headerName : message.getHeaders().keySet()) {
|
||||
if (headerName.startsWith("lambda") ||
|
||||
headerName.startsWith("scf-func-name")) {
|
||||
return true;
|
||||
}
|
||||
if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
for (String headerName : message.getHeaders().keySet()) {
|
||||
if (headerName.startsWith("lambda") ||
|
||||
headerName.startsWith("scf-func-name")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2020 the original author or authors.
|
||||
* Copyright 2017-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -34,7 +34,6 @@ import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
@@ -59,16 +58,11 @@ public class RequestProcessor {
|
||||
|
||||
private static Log logger = LogFactory.getLog(RequestProcessor.class);
|
||||
|
||||
|
||||
private final FunctionCatalog functionCatalog;
|
||||
|
||||
private final JsonMapper mapper;
|
||||
|
||||
public RequestProcessor(FunctionCatalog functionCatalog,
|
||||
ObjectProvider<JsonMapper> mapper,
|
||||
public RequestProcessor(ObjectProvider<JsonMapper> mapper,
|
||||
ObjectProvider<ServerCodecConfigurer> codecs) {
|
||||
this.mapper = mapper.getIfAvailable();
|
||||
this.functionCatalog = functionCatalog;
|
||||
}
|
||||
|
||||
public static FunctionWrapper wrapper(FunctionInvocationWrapper function) {
|
||||
@@ -186,35 +180,27 @@ public class RequestProcessor {
|
||||
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
|
||||
Publisher<?> result, Boolean single, boolean getter) {
|
||||
BodyBuilder builder = ResponseEntity.ok();
|
||||
if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) {
|
||||
result = Flux.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> addHeaders(builder, value))
|
||||
.map(message -> message.getPayload());
|
||||
if (result instanceof Mono) {
|
||||
result = Mono.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> {
|
||||
addHeaders(builder, value);
|
||||
if (!isValidCloudEvent(value.getHeaders().keySet())) {
|
||||
builder.headers(HeaderUtils.sanitize(request.headers()));
|
||||
}
|
||||
})
|
||||
.map(message -> message.getPayload());
|
||||
}
|
||||
else {
|
||||
if (result instanceof Mono) {
|
||||
result = Mono.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> {
|
||||
addHeaders(builder, value);
|
||||
if (!isValidCloudEvent(value.getHeaders().keySet())) {
|
||||
builder.headers(HeaderUtils.sanitize(request.headers()));
|
||||
}
|
||||
})
|
||||
.map(message -> message.getPayload());
|
||||
}
|
||||
else {
|
||||
result = Flux.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> {
|
||||
addHeaders(builder, value);
|
||||
if (!isValidCloudEvent(value.getHeaders().keySet())) {
|
||||
builder.headers(HeaderUtils.sanitize(request.headers()));
|
||||
}
|
||||
})
|
||||
.map(message -> message.getPayload());
|
||||
}
|
||||
result = Flux.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> {
|
||||
addHeaders(builder, value);
|
||||
if (!isValidCloudEvent(value.getHeaders().keySet())) {
|
||||
builder.headers(HeaderUtils.sanitize(request.headers()));
|
||||
}
|
||||
})
|
||||
.map(message -> message.getPayload());
|
||||
}
|
||||
|
||||
if (isOutputSingle(handler)
|
||||
@@ -231,7 +217,7 @@ public class RequestProcessor {
|
||||
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
|
||||
}
|
||||
|
||||
public boolean isValidCloudEvent(Set<String> headerKeys) {
|
||||
private boolean isValidCloudEvent(Set<String> headerKeys) {
|
||||
return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)
|
||||
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)
|
||||
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)
|
||||
|
||||
@@ -102,7 +102,6 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer<Gener
|
||||
private void registerEndpoint(GenericApplicationContext context) {
|
||||
context.registerBean(RequestProcessor.class,
|
||||
() -> new RequestProcessor(
|
||||
context.getBean(FunctionCatalog.class),
|
||||
context.getBeanProvider(JsonMapper.class),
|
||||
context.getBeanProvider(ServerCodecConfigurer.class)));
|
||||
context.registerBean(FunctionEndpointFactory.class,
|
||||
|
||||
@@ -114,10 +114,7 @@ public final class FunctionWebUtils {
|
||||
result = ((Mono) result).map(v -> postProcessResult(v, isMessage));
|
||||
}
|
||||
else if (result instanceof Message) {
|
||||
if (!isMessage) {
|
||||
// result = ((Message) result).getPayload();
|
||||
}
|
||||
else if (((Message) result).getPayload() instanceof byte[]) {
|
||||
if (((Message) result).getPayload() instanceof byte[]) {
|
||||
String str = new String((byte[]) ((Message) result).getPayload());
|
||||
result = MessageBuilder.withPayload(str).copyHeaders(((Message) result).getHeaders()).build();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user