Clean up and simplifications around CloudEvent processing

This commit effectively a merge of work with #607 and simplifies the following
- CloudEventAttributesProvider now provides CloudEventAttributes initialized with defaults to be set by the user
- In HTTP RequestProcessor the logic of sanitizing headers was improved to ensure that correct prefix is applied

Resolves #607
This commit is contained in:
Oleg Zhurakousky
2020-11-16 13:40:53 +01:00
parent 5ff37ff378
commit 8ea309c45b
11 changed files with 152 additions and 206 deletions

View File

@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -32,6 +33,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
@@ -195,8 +197,10 @@ public class RequestProcessor {
result = Mono.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> {
builder.headers(HeaderUtils.sanitize(request.headers()));
addHeaders(builder, value);
if (!isValidCloudEvent(value.getHeaders().keySet())) {
builder.headers(HeaderUtils.sanitize(request.headers()));
}
})
.map(message -> message.getPayload());
}
@@ -204,8 +208,10 @@ public class RequestProcessor {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> {
builder.headers(HeaderUtils.sanitize(request.headers()));
addHeaders(builder, value);
if (!isValidCloudEvent(value.getHeaders().keySet())) {
builder.headers(HeaderUtils.sanitize(request.headers()));
}
})
.map(message -> message.getPayload());
}
@@ -225,6 +231,13 @@ public class RequestProcessor {
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
public boolean isValidCloudEvent(Set<String> headerKeys) {
return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)
&& headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION);
}
// this seem to be very relevant to AWS container tests
private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
Map<String, Object> headers = new HashMap<>(HeaderUtils.fromHttp(request.headers()));