From 97ba22da76bf7981cf6dca7d9d4fe92cf24a7436 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 17 Nov 2020 08:01:21 +0100 Subject: [PATCH] Polishing cleanup and javadoc --- .../cloudevent/CloudEventMessageUtils.java | 19 ++++++ .../catalog/SimpleFunctionRegistry.java | 18 ++++-- .../cloud/function/web/RequestProcessor.java | 58 +++++++------------ .../function/FunctionEndpointInitializer.java | 1 - .../function/web/util/FunctionWebUtils.java | 5 +- 5 files changed, 54 insertions(+), 47 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index af30a2bb8..cd2cda4f0 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -202,6 +202,17 @@ public final class CloudEventMessageUtils { return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type); } + /** + * Will attempt to convert 'inputMessage' to a binary-mode Cloud Event {@link Message}. + * This typically happens when 'inputMessage' represents Cloud Event in structured-mode. + *
+ * In the event the message already represents Cloud Event in binary-mode, or this + * message does not represent Cloud Event at all, it will be returned unchanged. + * + * @param inputMessage instance of incoming {@link Message} + * @param messageConverter instance of {@link MessageConverter} to assist with type conversion. + * @return instance of {@link Message} representing Cloud Event in binary-mode or unchanged 'inputMessage'. + */ @SuppressWarnings("unchecked") public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { @@ -219,6 +230,7 @@ public final class CloudEventMessageUtils { : MimeTypeUtils.APPLICATION_JSON_VALUE; String suffix = contentType.getSubtypeSuffix(); + Assert.hasText(suffix, "Content-type 'suffix' can not be determined from " + contentType); MimeType cloudEventDeserializationContentType = MimeTypeUtils .parseMimeType(contentType.getType() + "/" + suffix); Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) @@ -238,6 +250,13 @@ public final class CloudEventMessageUtils { return inputMessage; } + /** + * Will attempt to determine based on the headers the origin of Message (e.g., HTTP, Kafka etc) + * and based on this designate prefix to be used for Cloud Events attributes (i.e., `ce-` or `ce_` etc). + * + * @param messageHeaders instance of {@link MessageHeaders} + * @return prefix to be used for Cloud Events attributes + */ public static String determinePrefixToUse(MessageHeaders messageHeaders) { Set keys = messageHeaders.keySet(); if (keys.contains("user-agent")) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 99f92421e..d7999a9d2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -47,6 +47,7 @@ import reactor.util.function.Tuples; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.function.cloudevent.CloudEventAttributes; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; @@ -910,13 +911,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * case that requires it since it may contain forwarding url */ private boolean containsRetainMessageSignalInHeaders(Message message) { - for (String headerName : message.getHeaders().keySet()) { - if (headerName.startsWith("lambda") || - headerName.startsWith("scf-func-name")) { - return true; - } + if (new CloudEventAttributes(message.getHeaders()).isValidCloudEvent()) { + return true; + } + else { + for (String headerName : message.getHeaders().keySet()) { + if (headerName.startsWith("lambda") || + headerName.startsWith("scf-func-name")) { + return true; + } + } + return false; } - return false; } /* diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index f7b8c0084..d5165e013 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2020 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import reactor.core.publisher.Mono; import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.message.MessageUtils; @@ -59,16 +58,11 @@ public class RequestProcessor { private static Log logger = LogFactory.getLog(RequestProcessor.class); - - private final FunctionCatalog functionCatalog; - private final JsonMapper mapper; - public RequestProcessor(FunctionCatalog functionCatalog, - ObjectProvider mapper, + public RequestProcessor(ObjectProvider mapper, ObjectProvider codecs) { this.mapper = mapper.getIfAvailable(); - this.functionCatalog = functionCatalog; } public static FunctionWrapper wrapper(FunctionInvocationWrapper function) { @@ -186,35 +180,27 @@ public class RequestProcessor { private Mono> response(FunctionWrapper request, Object handler, Publisher result, Boolean single, boolean getter) { BodyBuilder builder = ResponseEntity.ok(); - if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) { - result = Flux.from(result) - .map(message -> MessageUtils.unpack(handler, message)) - .doOnNext(value -> addHeaders(builder, value)) - .map(message -> message.getPayload()); + if (result instanceof Mono) { + result = Mono.from(result) + .map(message -> MessageUtils.unpack(handler, message)) + .doOnNext(value -> { + addHeaders(builder, value); + if (!isValidCloudEvent(value.getHeaders().keySet())) { + builder.headers(HeaderUtils.sanitize(request.headers())); + } + }) + .map(message -> message.getPayload()); } else { - if (result instanceof Mono) { - result = Mono.from(result) - .map(message -> MessageUtils.unpack(handler, message)) - .doOnNext(value -> { - addHeaders(builder, value); - if (!isValidCloudEvent(value.getHeaders().keySet())) { - builder.headers(HeaderUtils.sanitize(request.headers())); - } - }) - .map(message -> message.getPayload()); - } - else { - result = Flux.from(result) - .map(message -> MessageUtils.unpack(handler, message)) - .doOnNext(value -> { - addHeaders(builder, value); - if (!isValidCloudEvent(value.getHeaders().keySet())) { - builder.headers(HeaderUtils.sanitize(request.headers())); - } - }) - .map(message -> message.getPayload()); - } + result = Flux.from(result) + .map(message -> MessageUtils.unpack(handler, message)) + .doOnNext(value -> { + addHeaders(builder, value); + if (!isValidCloudEvent(value.getHeaders().keySet())) { + builder.headers(HeaderUtils.sanitize(request.headers())); + } + }) + .map(message -> message.getPayload()); } if (isOutputSingle(handler) @@ -231,7 +217,7 @@ public class RequestProcessor { return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); } - public boolean isValidCloudEvent(Set headerKeys) { + private boolean isValidCloudEvent(Set headerKeys) { return headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID) && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE) && headerKeys.contains(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index bc3f12331..5e3e4a0de 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -102,7 +102,6 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer new RequestProcessor( - context.getBean(FunctionCatalog.class), context.getBeanProvider(JsonMapper.class), context.getBeanProvider(ServerCodecConfigurer.class))); context.registerBean(FunctionEndpointFactory.class, diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java index ca0c5ca9f..112d9ff62 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java @@ -114,10 +114,7 @@ public final class FunctionWebUtils { result = ((Mono) result).map(v -> postProcessResult(v, isMessage)); } else if (result instanceof Message) { - if (!isMessage) { -// result = ((Message) result).getPayload(); - } - else if (((Message) result).getPayload() instanceof byte[]) { + if (((Message) result).getPayload() instanceof byte[]) { String str = new String((byte[]) ((Message) result).getPayload()); result = MessageBuilder.withPayload(str).copyHeaders(((Message) result).getHeaders()).build(); }