GH-588 Fix dropped headers during input conversion

This addresses the issue of dropped Message headers in the event where input is a Message but input conversion is not necessary
while Message contains headers that require explicit propagation (e.g., scf-sink-url, scf-func-name)

Resolves #588
This commit is contained in:
Oleg Zhurakousky
2020-09-18 18:46:33 +02:00
parent 42e7e9ddfb
commit 6df15f8a8a
2 changed files with 31 additions and 3 deletions

View File

@@ -736,8 +736,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
Publisher<?> result = publisher instanceof Mono
? Mono.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type))
: Flux.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type));
? Mono.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type)).doOnError(v -> {
v.printStackTrace();
})
: Flux.from(publisher).map(value -> this.convertInputValueIfNecessary(value, type)).doOnError(v -> {
v.printStackTrace();
});
return result;
}
@@ -793,7 +797,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
if (this.payloadIsSpecialType(((Message<?>) value).getPayload())) {
return null;
}
convertedValue = ((Message<?>) convertedValue).getPayload();
if (!((Message<?>) convertedValue).getHeaders().containsKey("scf-sink-url")) {
convertedValue = ((Message<?>) convertedValue).getPayload();
}
}
}
else if (rawType instanceof Class<?>) { // see AWS adapter with WildardTypeImpl and Azure with Voids

View File

@@ -36,6 +36,7 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.HybridFunctionalRegistrationTests.UppercaseFunction;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.JsonMessageConverter;
import org.springframework.cloud.function.context.config.NegotiatingMessageConverterWrapper;
@@ -81,6 +82,27 @@ public class SimpleFunctionRegistryTests {
this.conversionService = new DefaultConversionService();
}
@SuppressWarnings("unchecked")
@Test
public void testSCF588() {
UpperCase function = new UpperCase();
FunctionRegistration<UpperCase> registration = new FunctionRegistration<>(
function, "foo").type(FunctionType.of(UppercaseFunction.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter);
catalog.register(registration);
FunctionInvocationWrapper lookedUpFunction = catalog.lookup("uppercase");
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader("scf-sink-url", "blah")
.setHeader("scf-func-name", "blah")
.build();
Object result = lookedUpFunction.apply(message);
assertThat(result).isInstanceOf(Message.class);
assertThat(((Message<String>) result).getPayload()).isEqualTo("HELLO");
}
@Test
public void testFunctionLookup() {